Control the value returned by a spawned process

2019-08-29 10:26发布

问题:

In the code below, the worker function checks if the data passed is valid and if it is valid, it returns a dictionary which will be used in a bulk SQLAlchemy Core insert. If its invalid, I want the None value not to be added to the receiving_list because if it is, the bulk insert will fail as a single None value cannot map out to the table structure.

from datetime import datetime
from sqlalchemy import Table
import multiprocessing

CONN = Engine.connect() #Engine is imported from another module
NUM_CONSUMERS = multiprocessing.cpu_count() 
p = multiprocessing.Pool(NUM_CONSUMERS)

def process_data(data):
    #Long process to validate data
    if is_valid_data(data) == True:
        returned_dict = {}
        returned_dict['created_at'] = datetime.now()
        returned_dict['col1'] = data[0]
        returned_dict['colN'] = data[N]
        return returned_dict
    else:
        return None


def spawn_some_processes(data):
    table_to_insert = Table('postgresql_database_table', meta, autoload=True, autoload_with=Engine)
    While True:
        #Get some data here and pass it on to the worker
        receiving_list = p.map(process_data, data_to_process)

    try:
        if len(receiving_list) > 0:
            trans = CONN.begin()
            CONN.execute(table_to_insert.insert(), receiving_list)
            trans.commit()
    except IntegrityError:
        trans.rollback()
    except:
        trans.rollback()

Trying to rephrase the question, how can I stop a spawned process from adding to receiving_list when the value None is returned by the spawned process?

A workaround is incorporating a queue with queue.put() and queue.get() that will put only valid data. The disadvantage with this is that after the processes are over, I have to then unpack the queue which adds overhead. My ideal solution would be one where a clean list of dictionaries is returned which SQLAlchemy can use to do the bulk insert

回答1:

You can just remove the None entries from the list:

received_list = filter(None, p.map(process_data, data_to_process))

This is pretty quick even for really huge lists:

>>> timeit.timeit('l = filter(None, l)', 'l = range(0,10000000)', number=1)
0.47683095932006836

Note that using filter will remove anything where bool(val) is False, like empty strings, empty lists, etc. This should be fine for your use-case, though.