I'm trying to use a queue with the multiprocessing library in Python. After executing the code below (the print statements work), but the processes do not quit after I call join on the Queue and there are still alive. How can I terminate the remaining processes?
Thanks!
def MultiprocessTest(self):
print "Starting multiprocess."
print "Number of CPUs",multiprocessing.cpu_count()
num_procs = 4
def do_work(message):
print "work",message ,"completed"
def worker():
while True:
item = q.get()
do_work(item)
q.task_done()
q = multiprocessing.JoinableQueue()
for i in range(num_procs):
p = multiprocessing.Process(target=worker)
p.daemon = True
p.start()
source = ['hi','there','how','are','you','doing']
for item in source:
q.put(item)
print "q close"
q.join()
#q.close()
print "Finished everything...."
print "num active children:",multiprocessing.active_children()
The code below may not be very relevant but I post it for your comments/feedbacks so we can learn together. Thank you!
You have to clear the queue before joining the process, but q.empty() is unreliable.
The best way to clear the queue is to count the number of successful gets or loop until you receive a sentinel value, just like a socket with a reliable network.
Here is a sentinel-free method for the relatively simple case where you put a number of tasks on a
JoinableQueue
, then launch worker processes that consume the tasks and exit once they read the queue "dry". The trick is to useJoinableQueue.get_nowait()
instead ofget()
.get_nowait()
, as the name implies, tries to get a value from the queue in a non-blocking manner and if there's nothing to be gotten then aqueue.Empty
exception is raised. The worker handles this exception by exiting.Rudimentary code to illustrate the principle:
The advantage is that you do not need to put the "poison pills" on the queue so the code is a bit shorter.
IMPORTANT : in more complex situations where producers and consumers use the same queue in an "interleaved" manner and the workers may have to wait for new tasks to come along, the "poison pill" approach should be used. My suggestion above is for simple cases where the workers "know" that if the task queue is empty, then there's no point hanging around any more.
Your workers need a sentinel to terminate, or they will just sit on the blocking reads. Note that using sleep on the Q instead of join on the P lets you display status information etc.
My preferred template is:
try this: