multiprocessing.Queue as arg to pool worker aborts

2019-02-19 21:08发布

问题:

I'm actually finding it hard to believe that I've run into the issue I have, it seems like it would be a big bug in the python multiprocessing module... Anyways the problem I've run into is that whenever I pass a multiprocessing.Queue to a multiprocessing.Pool worker as an argument the pool worker never executes its code. I've been able to reproduce this bug even on a very simple test that is a slightly modified version of example code found in the python docs.

Here is the original version of the example code for Queues:

from multiprocessing import Process, Queue

def f(q):
    q.put([42, None, 'hello'])


if __name__ == '__main__':
    q = Queue()
    p = Process(target=f, args=(q,))
    p.start()
    print(q.get())  # prints "[42, None, 'hello']"
    p.join()

Here is my modified version of the example code for Queues:

from multiprocessing import Queue, Pool

def f(q):
    q.put([42, None, 'hello'])

if __name__ == '__main__':
    q = Queue()
    p = Pool(1)
    p.apply_async(f,args=(q,))
    print(q.get()) # prints "[42, None, 'hello']"
    p.close()
    p.join()

All I've done is make p a process pool of size 1 instead of a multiprocessing.Process object and the result is that the code hangs on the print statement forever because nothing was ever written to the Queue! Of course I tested this in its original form and it works fine. My OS is windows 10 and my python version is 3.5.x, anyone have any idea why this is happening?

Update: Still no idea why this example code works with a multiprocessing.Process and not a multiprocessing.Pool but I found a work around I'm content with (Alex Martelli's answer). Apparently you can just make a global list of multiprocessing.Queues and pass each process and index to use, I'm going to avoid using a managed queue because they are slower. Thanks Guest for showing me the link.

回答1:

Problem

When you call apply_async it returns a AsyncResult object and leaves the workload distribution to a separate thread (see also this answer). This thread encounters the problem that the Queue object can't be pickled and therefore the requested work can't be distributed (and eventually executed). We can see this by calling AsyncResult.get:

r = p.apply_async(f,args=(q,))
r.get()

which raises a RuntimeError:

RuntimeError: Queue objects should only be shared between processes through inheritance

However this RuntimeError is only raised in the main thread once you request the result because it actually occurred in a different thread (and thus needs a way to be transmitted).

So what happens when you do

p.apply_async(f,args=(q,))

is that the target function f is never invoked because one of it's arguments (q) can't be pickled. Therefore q never receives an item and remains empty and for that reason the call to q.get in the main thread blocks forever.

Solution

With apply_async you don't have to manage the result queues manually but they are readily provided to you in form of AsyncResult objects. So you can modify the code to simply return from the target function:

from multiprocessing import Queue, Pool

def f():
    return [42, None, 'hello']

if __name__ == '__main__':
    q = Queue()
    p = Pool(1)
    result = p.apply_async(f)
    print(result.get())