I create 100 child processes
proc_list = [
Process(target = simulator, args=(result_queue,))
for i in xrange(100)]
and start them
for proc in proc_list: proc.start()
Each process puts into the result_queue (instance of multiprocessing.Queue) 10000 tuples after doing some processing.
def simulate(alg_instance, image_ids, gamma, results,
simulations, sim_semaphore):
(rs, qs, t_us) = alg_instance.simulate_multiple(image_ids, gamma,
simulations)
all_tuples = zip(rs, qs, t_us)
for result in all_tuples:
results.put(result)
sim_semaphore.release()
I should be (?) getting 1000000 tuples at the queue, but after various runs I get these (sample) sizes:
14912
19563
12952
13524
7487
18350
15986
11928
14281
14282
7317
Any suggestions?
My solution to multiprocessing issues is almost always to use the Manager objects. While the exposed interface is the same, the underlying implementation is much simpler and has less bugs.
from multiprocessing import Manager
manager = Manager()
result_queue = manager.Queue()
Try it out and see if it doesn't fix your issues.
The multiprocessing.Queue is said to be thread-safe in its documentations. But when you are doing inter-process communications with Queue, it should be used with multiprocessing.Manager().Queue()
There's no evidence from the OP post that multiprocessing.Queue
does not work. The code posted by the OP is not at all sufficient to understand what's going on: do they join all the processes? do they correctly pass the queue to the child processes (has to be as a parameter if it's on Windows)? do their child processes verify that they actually got 10000 tuples? etc.
There's a chance that the OP is really encountering a hard-to-reproduce bug in mp.Queue
, but given the amount of testing CPython has gone through, and the fact that I just ran 100 processes x 10000 results without any trouble, I suspect the OP actually had some problem in their own code.
Yes, Manager().Queue()
mentioned in other answers is a perfectly fine way to share data, but there's no reason to avoid multiprocessing.Queue()
based on unconfirmed reports that "something is wrong with it".