Can I somehow share an asynchronous queue with a s

2020-05-17 16:57发布

问题:

I would like to use a queue for passing data from a parent to a child process which is launched via multiprocessing.Process. However, since the parent process uses Python's new asyncio library, the queue methods need to be non-blocking. As far as I understand, asyncio.Queue is made for inter-task communication and cannot be used for inter-process communication. Also, I know that multiprocessing.Queue has the put_nowait() and get_nowait() methods but I actually need coroutines that would still block the current task (but not the whole process). Is there some way to create coroutines that wrap put_nowait()/get_nowait()? On another note, are the threads that multiprocessing.Queue uses internally compatible after all with an event loop running in the same process?

If not, what other options do I have? I know I could implement such a queue myself by making use of asynchronous sockets but I hoped I could avoid that…

EDIT: I also considered using pipes instead of sockets but it seems asyncio is not compatible with multiprocessing.Pipe(). More precisely, Pipe() returns a tuple of Connection objects which are not file-like objects. However, asyncio.BaseEventLoop's methods add_reader()/add_writer() methods and connect_read_pipe()/connect_write_pipe() all expect file-like objects, so it is impossible to asynchronously read from/write to such a Connection. In contrast, the usual file-like objects that the subprocess package uses as pipes pose no problem at all and can easily be used in combination with asyncio.

UPDATE: I decided to explore the pipe approach a bit further: I converted the Connection objects returned by multiprocessing.Pipe() into file-like objects by retrieving the file descriptor via fileno() and passing it to os.fdopen(). Finally, I passed the resulting file-like object to the event loop's connect_read_pipe()/connect_write_pipe(). (There is some mailing list discussion on a related issue if someone is interested in the exact code.) However, read()ing the stream gave me an OSError: [Errno 9] Bad file descriptor and I didn't manage to fix this. Also considering the missing support for Windows, I will not pursue this any further.

回答1:

Here is an implementation of a multiprocessing.Queue object that can be used with asyncio. It provides the entire multiprocessing.Queue interface, with the addition of coro_get and coro_put methods, which are asyncio.coroutines that can be used to asynchronously get/put from/into the queue. The implementation details are essentially the same as the second example of my other answer: ThreadPoolExecutor is used to make the get/put asynchronous, and a multiprocessing.managers.SyncManager.Queue is used to share the queue between processes. The only additional trick is implementing __getstate__ to keep the object picklable despite using a non-picklable ThreadPoolExecutor as an instance variable.

from multiprocessing import Manager, cpu_count
from concurrent.futures import ProcessPoolExecutor, ThreadPoolExecutor

def AsyncProcessQueue(maxsize=0):
    m = Manager()
    q = m.Queue(maxsize=maxsize)
    return _ProcQueue(q)   

class _ProcQueue(object):
    def __init__(self, q):
        self._queue = q
        self._real_executor = None
        self._cancelled_join = False

    @property
    def _executor(self):
        if not self._real_executor:
            self._real_executor = ThreadPoolExecutor(max_workers=cpu_count())
        return self._real_executor

    def __getstate__(self):
        self_dict = self.__dict__
        self_dict['_real_executor'] = None
        return self_dict

    def __getattr__(self, name):
        if name in ['qsize', 'empty', 'full', 'put', 'put_nowait',
                    'get', 'get_nowait', 'close']:
            return getattr(self._queue, name)
        else:
            raise AttributeError("'%s' object has no attribute '%s'" % 
                                    (self.__class__.__name__, name))

    @asyncio.coroutine
    def coro_put(self, item):
        loop = asyncio.get_event_loop()
        return (yield from loop.run_in_executor(self._executor, self.put, item))

    @asyncio.coroutine    
    def coro_get(self):
        loop = asyncio.get_event_loop()
        return (yield from loop.run_in_executor(self._executor, self.get))

    def cancel_join_thread(self):
        self._cancelled_join = True
        self._queue.cancel_join_thread()

    def join_thread(self):
        self._queue.join_thread()
        if self._real_executor and not self._cancelled_join:
            self._real_executor.shutdown()

@asyncio.coroutine
def _do_coro_proc_work(q, stuff, stuff2):
    ok = stuff + stuff2
    print("Passing %s to parent" % ok)
    yield from q.coro_put(ok)  # Non-blocking
    item = q.get() # Can be used with the normal blocking API, too
    print("got %s back from parent" % item)

def do_coro_proc_work(q, stuff, stuff2):
    loop = asyncio.get_event_loop()
    loop.run_until_complete(_do_coro_proc_work(q, stuff, stuff2))

@asyncio.coroutine
def do_work(q):
    loop.run_in_executor(ProcessPoolExecutor(max_workers=1),
                         do_coro_proc_work, q, 1, 2)
    item = yield from q.coro_get()
    print("Got %s from worker" % item)
    item = item + 25
    q.put(item)

if __name__  == "__main__":
    q = AsyncProcessQueue()
    loop = asyncio.get_event_loop()
    loop.run_until_complete(do_work(q))

Output:

Passing 3 to parent
Got 3 from worker
got 28 back from parent

As you can see, you can use the AsyncProcessQueue both synchronously and asynchronously, from either the parent or child process. It doesn't require any global state, and by encapsulating most of the complexity in a class, is more elegant to use than my original answer.

You'll probably be able to get better performance using sockets directly, but getting that working in a cross-platform way seems to be pretty tricky. This also has the advantage of being usable across multiple workers, won't require you to pickle/unpickle yourself, etc.



回答2:

The multiprocessing library isn't particularly well-suited for use with asyncio, unfortunately. Depending on how you were planning to use the multiprocessing/multprocessing.Queue, however, you may be able to replace it completely with a concurrent.futures.ProcessPoolExecutor:

import asyncio
from concurrent.futures import ProcessPoolExecutor


def do_proc_work(stuff, stuff2):  # This runs in a separate process
    return stuff + stuff2

@asyncio.coroutine
def do_work():
    out = yield from loop.run_in_executor(ProcessPoolExecutor(max_workers=1),
                                          do_proc_work, 1, 2)
    print(out)

if __name__  == "__main__":
    loop = asyncio.get_event_loop()
    loop.run_until_complete(do_work())

Output:

3

If you absolutely need a multiprocessing.Queue, It seems like it will behave ok when combined with ProcessPoolExecutor:

import asyncio
import time
import multiprocessing
from concurrent.futures import ProcessPoolExecutor, ThreadPoolExecutor


def do_proc_work(q, stuff, stuff2):
    ok = stuff + stuff2
    time.sleep(5) # Artificial delay to show that it's running asynchronously
    print("putting output in queue")
    q.put(ok)

@asyncio.coroutine
def async_get(q):
    """ Calls q.get() in a separate Thread. 

    q.get is an I/O call, so it should release the GIL.
    Ideally there would be a real non-blocking I/O-based 
    Queue.get call that could be used as a coroutine instead 
    of this, but I don't think one exists.

    """
    return (yield from loop.run_in_executor(ThreadPoolExecutor(max_workers=1), 
                                           q.get))

@asyncio.coroutine
def do_work(q):
    loop.run_in_executor(ProcessPoolExecutor(max_workers=1),
                         do_proc_work, q, 1, 2)
    coro = async_get(q) # You could do yield from here; I'm not just to show that it's asynchronous
    print("Getting queue result asynchronously")
    print((yield from coro))

if __name__  == "__main__":
    m = multiprocessing.Manager()
    q = m.Queue() # The queue must be inherited by our worker, it can't be explicitly passed in
    loop = asyncio.get_event_loop()
    loop.run_until_complete(do_work(q))

Output:

Getting queue result asynchronously
putting output in queue
3