I am currently playing around with multiprocessing and queues. I have written a piece of code to export data from mongoDB, map it into a relational (flat) structure, convert all values to string and insert them into mysql.
Each of these steps is submitted as a process and given import/export queues, safe for the mongoDB export which is handled in the parent.
As you will see below, I use queues and child processes terminate themselves when they read "None" from the queue. The problem I currently have is that, if a child process runs into an unhandled Exception, this is not recognized by the parent and the rest just Keeps running. What I want to happen is that the whole shebang quits and at best reraise the child error.
I have two questions:
- How do I detect the child error in the parent?
- How do I kill my child processes after detecting the error (best practice)? I realize that putting "None" to the queue to kill the child is pretty dirty.
I am using python 2.7.
Here are the essential parts of my code:
# Establish communication queues
mongo_input_result_q = multiprocessing.Queue()
mapper_result_q = multiprocessing.Queue()
converter_result_q = multiprocessing.Queue()
[...]
# create child processes
# all processes generated here are subclasses of "multiprocessing.Process"
# create mapper
mappers = [mongo_relational_mapper.MongoRelationalMapper(mongo_input_result_q, mapper_result_q, columns, 1000)
for i in range(10)]
# create datatype converter, converts everything to str
converters = [datatype_converter.DatatypeConverter(mapper_result_q, converter_result_q, 'str', 1000)
for i in range(10)]
# create mysql writer
# I create a list of writers. currently only one,
# but I have the option to parallellize it further
writers = [mysql_inserter.MySqlWriter(mysql_host, mysql_user, mysql_passwd, mysql_schema, converter_result_q
, columns, 'w_'+mysql_table, 1000) for i in range(1)]
# starting mapper
for mapper in mappers:
mapper.start()
time.sleep(1)
# starting converter
for converter in converters:
converter.start()
# starting writer
for writer in writers:
writer.start()
[... initializing mongo db connection ...]
# put each dataset read to queue for the mapper
for row in mongo_collection.find({inc_column: {"$gte": start}}):
mongo_input_result_q.put(row)
count += 1
if count % log_counter == 0:
print 'Mongo Reader' + " " + str(count)
print "MongoReader done"
# Processes are terminated when they read "None" object from queue
# now that reading is finished, put None for each mapper in the queue so they terminate themselves
# the same for all followup processes
for mapper in mappers:
mongo_input_result_q.put(None)
for mapper in mappers:
mapper.join()
for converter in converters:
mapper_result_q.put(None)
for converter in converters:
converter.join()
for writer in writers:
converter_result_q.put(None)
for writer in writers:
writer.join()
Why not to let the Process to take care of its own exceptions, like this:
Now you have, both error and traceback at your hands:
Regards, Marek
Thanks to kobejohn i have found a solution which is nice and stable.
I have created a subclass of multiprocessing.Process which implements some functions and overwrites the
run()
method to wrap a new saferun method into a try-catch block. This Class requires a feedback_queue to initialize which is used to report info, debug, error messages back to the parent. The log methods in the class are wrappers for the globally defined log functions of the package:I have subclassed all my other process steps from EtlStepProcess. The code to be run is implemented in the saferun() method rather than run. This ways i do not have to add a try catch block around it, since this is already done by the run() method. Example:
In my main file, I submit a Process that does all the work and give it a feedback_queue. This process starts all the steps and thenreads from mongoDB and puts values to the initial queue. My main process listens to the feedback queue and prints all log messages. If it receives an error log, it print the error and terminate its child, which in return also terminates all its children before dying.
I think about making a module out of it and putting it up on github, but I have to do some cleaning up and commenting first.
I don't know standard practice but what I've found is that to have reliable multiprocessing I design the methods/class/etc. specifically to work with multiprocessing. Otherwise you never really know what's going on on the other side (unless I've missed some mechanism for this).
Specifically what I do is:
multiprocessing.Process
or make functions that specifically support multiprocessing (wrapping functions that you don't have control over if necessary)multiprocessing.Queue
from the main process to each worker processtry: ... except Exception as e
. Then when something unexpected happens send an error package with:The end result is worker processes that can survive for a long time and that can let you know what's happening when something goes wrong. They will die quietly since you can handle whatever you need to do after the catch-all exception and you will also know when you need to restart a worker.
Again, I've just come to this pattern through trial and error so I don't know how standard it is. Does that help with what you are asking for?