In the code below, the worker function checks if the data passed is valid and if it is valid, it returns a dictionary which will be used in a bulk SQLAlchemy Core insert. If its invalid, I want the None
value not to be added to the receiving_list
because if it is, the bulk insert will fail as a single None
value cannot map out to the table structure.
from datetime import datetime
from sqlalchemy import Table
import multiprocessing
CONN = Engine.connect() #Engine is imported from another module
NUM_CONSUMERS = multiprocessing.cpu_count()
p = multiprocessing.Pool(NUM_CONSUMERS)
def process_data(data):
#Long process to validate data
if is_valid_data(data) == True:
returned_dict = {}
returned_dict['created_at'] = datetime.now()
returned_dict['col1'] = data[0]
returned_dict['colN'] = data[N]
return returned_dict
else:
return None
def spawn_some_processes(data):
table_to_insert = Table('postgresql_database_table', meta, autoload=True, autoload_with=Engine)
While True:
#Get some data here and pass it on to the worker
receiving_list = p.map(process_data, data_to_process)
try:
if len(receiving_list) > 0:
trans = CONN.begin()
CONN.execute(table_to_insert.insert(), receiving_list)
trans.commit()
except IntegrityError:
trans.rollback()
except:
trans.rollback()
Trying to rephrase the question, how can I stop a spawned process from adding to receiving_list
when the value None
is returned by the spawned process?
A workaround is incorporating a queue with queue.put()
and queue.get()
that will put
only valid data. The disadvantage with this is that after the processes are over, I have to then unpack
the queue which adds overhead. My ideal solution would be one where a clean list of dictionaries is returned which SQLAlchemy can use to do the bulk insert