How to combine python asyncio and multiprocessing?

2019-08-20 20:09发布

问题:

I have a device that needs multiprocessing to handle the CPU bound deserialization & decoding of the incoming data; but the rest of the application is slower IO-limited code, which is excellent for asyncio. However, it seems like there is no good way to combine multiprocessing and asyncio together.

I have tried https://github.com/dano/aioprocessing, which uses threaded executors for multiprocessing operations. However, this library does not natively support common asyncio operations; for example, canceling a co-routine waiting on a queue.get with this library will lead to deadlock.

I have also tried to use a ProcessPoolExecutor, but passing multiprocessing objects to this executor does not work since the queue objects are not passed at the creation of the process.

import multiprocessing
import asyncio
import atexit
from concurrent.futures import ProcessPoolExecutor


@atexit.register
def kill_children():
    [p.kill() for p in multiprocessing.active_children()]


async def queue_get(queue: multiprocessing.Queue):
    executor = ProcessPoolExecutor(max_workers=1)
    loop = asyncio.get_running_loop()
    return await loop.run_in_executor(executor, queue.get)


async def main():
    queue = multiprocessing.Queue()
    get_task = asyncio.create_task(queue_get(queue))

    queue.put(None)

    print(await get_task)


if __name__ == "__main__":
    asyncio.run(main())

Running this code leads to this exception:

RuntimeError: Queue objects should only be shared between processes through inheritance

Is there any way to cleanly bridge the gap between multiprocessing and asyncio?

回答1:

Per Can I somehow share an asynchronous queue with a subprocess?

The above code can be modified to work with a multiprocessing queue by creating the queue through a multiprocessing.Manager()

import multiprocessing
import asyncio
import atexit
from concurrent.futures import ProcessPoolExecutor


@atexit.register
def kill_children():
    [p.kill() for p in multiprocessing.active_children()]


async def queue_get(queue: multiprocessing.Queue):
    executor = ProcessPoolExecutor(max_workers=1)
    loop = asyncio.get_running_loop()
    return await loop.run_in_executor(executor, queue.get)


async def main():
    manager = multiprocessing.Manager()
    queue = manager.Queue()
    get_task = asyncio.create_task(queue_get(queue))

    queue.put(None)
    print(await get_task)


if __name__ == "__main__":
    asyncio.run(main())