I'm trying to use multiprocessing to spawn 4 processes that brute force some calculations and each has a very small chance of manipulating a single list object on each iteration that I want to share between them. Not best practice in the guidelines but I need "many hands".
The code works fine for relatively small numbers of iterations but when increasing the number to a certain threshold, all four processes will go into zombie state. They fail silently.
I attempt to track the modifications to the shared list by using multiprocessing.Queue()
. It appears from this SO post, this closed Python issue – "not a bug", and several posts referring to these, that the underlying pipe can become overloaded and the processes just hang. The accepted answer in the SO post is extremely difficult to decipher because of so much excess code.
Edited for clarity:
The examples in the documentation do very lightweight things, almost always single function calls. Therefore, I don't know whether I'm misunderstanding and abusing features.
The guidelines say:
It is probably best to stick to using queues or pipes for communication between processes rather than using the lower level synchronization primitives from the threading module.
Does "communicate" here mean something other than what I'm actually doing in my example?
Or
Does this mean that I should be sharing my_list
in the queue rather than with a manager? Wouldn't this mean queue.get
and queue.put
on every iteration of every process?
If maxsize is less than or equal to zero, the queue size is infinite.
Doing this does not fix the error in my failing example. Until the point that I do queue.put()
all of the data is stored within a normal Python list: my_return_list
so is this actually failing due to the links I provided?
Is there a better way of doing this compared to my current workaround? I can't seem to find others taking an approach that looks similar, I feel I'm missing something. I need this to work for both Windows and Linux.
Failing example (depending on iterations under __main__
):
import multiprocessing as mp
import random
import sys
def mutate_list(my_list, proc_num, iterations, queue, lock):
my_return_list = []
if iterations < 1001:
# Works fine
for x in xrange(iterations):
if random.random() < 0.01:
lock.acquire()
print "Process {} changed list from:".format(proc_num)
print my_list
print "to"
random.shuffle(my_list)
print my_list
print "........"
sys.stdout.flush()
lock.release()
my_return_list.append([x, list(my_list)])
else:
for x in xrange(iterations):
# Enters zombie state
if random.random() < 0.01:
lock.acquire()
random.shuffle(my_list)
my_return_list.append([x, list(my_list)])
lock.release()
if x % 1000 == 0:
print "Completed iterations:", x
sys.stdout.flush()
queue.put(my_return_list)
def multi_proc_handler(iterations):
manager = mp.Manager()
ns = manager.list()
ns.extend([x for x in range(10)])
queue = mp.Queue()
lock = manager.Lock()
print "Starting list to share", ns
print ns
sys.stdout.flush()
p = [mp.Process(target=mutate_list, args=(ns,x,iterations,queue,lock)) for x in range(4)]
for process in p: process.start()
for process in p: process.join()
output = [queue.get() for process in p]
return output
if __name__ == '__main__':
# 1000 iterations is fine, 100000 iterations will create zombies
multi_caller = multi_proc_handler(100000)
Workaround using multiprocessing.Manager.list()
:
import multiprocessing as mp
import random
import sys
def mutate_list(my_list, proc_num, iterations, my_final_list, lock):
for x in xrange(iterations):
if random.random() < 0.01:
lock.acquire()
random.shuffle(my_list)
my_final_list.append([x, list(my_list)])
lock.release()
if x % 10000 == 0:
print "Completed iterations:", x
sys.stdout.flush()
def multi_proc_handler(iterations):
manager = mp.Manager()
ns = manager.list([x for x in range(10)])
lock = manager.Lock()
my_final_list = manager.list() # My Queue substitute
print "Starting list to share", ns
print ns
sys.stdout.flush()
p = [mp.Process(target=mutate_list, args=(ns,x,iterations,my_final_list,
lock)) for x in range(4)]
for process in p: process.start()
for process in p: process.join()
return list(my_final_list)
if __name__ == '__main__':
multi_caller = multi_proc_handler(100000)