Passing a Pipe/Connection as context arg to multip

2019-07-23 10:12发布

问题:

I want to use pipes to talk to the process instances in my pool, but I'm getting an error:

Let __p be an instance of Pool():

    (master_pipe, worker_pipe) = Pipe()

    self.__p.apply_async(_worker_task, 
                         (handler_info, 
                          context_info,
                          worker_pipe))

When I execute this, I get the following error [for every instance, obviously]:

  File "/System/Library/Frameworks/Python.framework/Versions/2.7/lib/python2.7/multiprocessing/queues.py", line 376, in get
    task = get()
  File "/System/Library/Frameworks/Python.framework/Versions/2.7/lib/python2.7/multiprocessing/queues.py", line 376, in get
TypeError: Required argument 'handle' (pos 1) not found
    self.run()
  File "/System/Library/Frameworks/Python.framework/Versions/2.7/lib/python2.7/multiprocessing/process.py", line 114, in run
    return recv()
    return recv()
    self._target(*self._args, **self._kwargs)
  File "/System/Library/Frameworks/Python.framework/Versions/2.7/lib/python2.7/multiprocessing/pool.py", line 102, in worker
TypeError: Required argument 'handle' (pos 1) not found
TypeError: Required argument 'handle' (pos 1) not found
    task = get()
  File "/System/Library/Frameworks/Python.framework/Versions/2.7/lib/python2.7/multiprocessing/queues.py", line 376, in get
    return recv()
TypeError: Required argument 'handle' (pos 1) not found

The error is specifically referring to the Connection instance that I'm trying to pass. If I make it "None", the workers fork without error.

I don't understand this since, as the document emphasizes through example, I can easily pass the same argument to a Process(), and have it work perfectly:

from multiprocessing import Pipe, Process
def call_me(p):
  print("Here: %s" % (p))

(master, worker) = Pipe()
p = Process(target=call_me, args=(worker,))
p.start()

Here: <read-write Connection, handle 6>

p.join()

回答1:

It looks like this bug (http://bugs.python.org/issue4892) noted in this discussion: Python 2.6 send connection object over Queue / Pipe / etc

The pool forks child processes initially with pipes for communicating tasks/results to/from the child processes. It's in communicating your Pipe object over the existing pipe that it blows up - not on the forking. (the failure is when the child process tries a get() on the queue abstraction).

It looks like the problem arises because of how the Pipe object is pickled/unpickled for communication.

In the second case that you noted, the pipe is passed to a process instance and then forked - thus the difference in behavior.

I can't imagine that actively communicating with pool processes outside of pure task distribution was an intended use case for multiprocessing pool though. State/protocol-wise, that would imply that you would want more control over the process. That would require more context than what the general Pool object could ever know.



回答2:

This is possible to solve by using the initializer and initargs arguments when you create the pool and its processes. Admittedly there has to be a global variable involved as well. However if you put the worker code in a separate module, it doesn't look all that bad. And it is only global to that process. :-)

A typical case is that you want your worker processes to add stuff to a multiprocessing queue. As that has to do with something having to reside in a certain spot in the memory, pickling will not work. Even if it would have worked, it would just have copied data about the fact that some process has a queue. Which is the opposite of what we want here. We want to share the same queue.

So here is a meta code example:

The module containing the worker code, we call it "worker_module":

def worker_init(_the_queue):
    global the_queue
    the_queue = _the_queue

def do_work(_a_string):
    # Add something to the queue
    the_queue.put("the string " + _a_string)

And the creation of the pool, followed by having it doing something

# Import our functions
from worker_module import worker_init, do_work

# Good idea: Call it MPQueue to not confuse it with the other Queue
from multiprocessing import Queue as MPQueue
from multiprocessing import Pool

the_queue = MPQueue() 
# Initialize workers, it is only during initialization we can pass the_queue
the_pool = Pool(processes= 3, initializer=worker_init, initargs=[the_queue,])
# Do the work
the_pool.apply(do_work, ["my string",])
# The string is now on the queue
my_string = the_queue.get(True))


回答3:

This is a bug which has been fixed in Python 3.

Easiest solution is to pass the queue through the Pool's initializer as suggested in the other answer.