I am using the multiprocessing
python library to spawn 4 Process()
objects to parallelize a cpu intensive task. The task (inspiration and code from this great article) is to compute the prime factors for every integer in a list.
main.py:
import random
import multiprocessing
import sys
num_inputs = 4000
num_procs = 4
proc_inputs = num_inputs/num_procs
input_list = [int(1000*random.random()) for i in xrange(num_inputs)]
output_queue = multiprocessing.Queue()
procs = []
for p_i in xrange(num_procs):
print "Process [%d]"%p_i
proc_list = input_list[proc_inputs * p_i:proc_inputs * (p_i + 1)]
print " - num inputs: [%d]"%len(proc_list)
# Using target=worker1 HANGS on join
p = multiprocessing.Process(target=worker1, args=(p_i, proc_list, output_queue))
# Using target=worker2 RETURNS with success
#p = multiprocessing.Process(target=worker2, args=(p_i, proc_list, output_queue))
procs.append(p)
p.start()
for p in jobs:
print "joining ", p, output_queue.qsize(), output_queue.full()
p.join()
print "joined ", p, output_queue.qsize(), output_queue.full()
print "Processing complete."
ret_vals = []
while output_queue.empty() == False:
ret_vals.append(output_queue.get())
print len(ret_vals)
print sys.getsizeof(ret_vals)
Observation:
- If the target for each process is the function
worker1
, for an input list larger than 4000 elements the main thread gets stuck on.join()
, waiting for the spawned processes to terminate and never returns. - If the target for each process is the function
worker2
, for the same input list the code works just fine and the main thread returns.
This is very confusing to me, as the only difference between worker1
and worker2
(see below) is that the former inserts individual lists in the Queue
whereas the latter inserts a single list of lists for each process.
Why is there deadlock using worker1
and not using worker2
target?
Shouldn't both (or neither) go beyond the Multiprocessing Queue maxsize limit is 32767?
worker1 vs worker2:
def worker1(proc_num, proc_list, output_queue):
'''worker function which deadlocks'''
for num in proc_list:
output_queue.put(factorize_naive(num))
def worker2(proc_num, proc_list, output_queue):
'''worker function that works'''
workers_stuff = []
for num in proc_list:
workers_stuff.append(factorize_naive(num))
output_queue.put(workers_stuff)
There are a lot of similar questions on SO, but I believe the core of this questions is clearly distinct from all of them.
Related Links:
- https://sopython.com/canon/82/programs-using-multiprocessing-hang-deadlock-and-never-complete/
- python multiprocessing issues
- python multiprocessing - process hangs on join for large queue
- Process.join() and queue don't work with large numbers
- Python 3 Multiprocessing queue deadlock when calling join before the queue is empty
- Script using multiprocessing module does not terminate
- Why does multiprocessing.Process.join() hang?
- When to call .join() on a process?
- What exactly is Python multiprocessing Module's .join() Method Doing?
The docs warn about this:
While a
Queue
appears to be unbounded, under the covers queued items are buffered in memory to avoid overloading inter-process pipes. A process cannot end normally before those memory buffers are flushed. Yourworker1()
puts a lot more items on the queue than yourworker2()
, and that's all there is to it. Note that the number of items that can queued before the implementation resorts to buffering in memory isn't defined: it can vary across OS and Python release.As the docs suggest, the normal way to avoid this is to
.get()
all the items off the queue before you attempt to.join()
the processes. As you've discovered, whether it's necessary to do so depends in an undefined way on how many items have been put on the queue by each worker process.