How can I wrap a synchronous function in an async

2019-03-09 18:11发布

问题:

I'm using aiohttp to build an API server that sends TCP requests off to a seperate server. The module that sends the TCP requests is synchronous and a black box for my purposes. So my problem is that these requests are blocking the entire API. I need a way to wrap the module requests in an asynchronous coroutine that won't block the rest of the API.

So, just using sleep as a simple example, is there any way to somehow wrap time-consuming synchronous code in a non-blocking coroutine, something like this:

async def sleep_async(delay):
    # After calling sleep, loop should be released until sleep is done
    yield sleep(delay)
    return 'I slept asynchronously'

回答1:

Eventually I found an answer in this thread. The method I was looking for is run_in_executor. This allows a synchronous function to be run asynchronously without blocking an event loop.

In the sleep example I posted above, it might look like this:

import asyncio
from time import sleep
from concurrent.futures import ProcessPoolExecutor

async def sleep_async(loop, delay):
    # Can set executor to None if a default has been set for loop
    await loop.run_in_executor(ProcessPoolExecutor(), sleep, delay)
    return 'I slept asynchronously'

Also see the following answer -> How do we call a normal function where a coroutine is expected?



回答2:

You can use a decorator to wrap the sync version to an async version.

import time
from functools import wraps, partial


def wrap(func):
    @wraps(func)
    async def run(*args, loop=None, executor=None, **kwargs):
        if loop is None:
            loop = asyncio.get_event_loop()
        pfunc = partial(func, *args, **kwargs)
        return await loop.run_in_executor(executor, pfunc)
    return run

@wrap
def sleep_async(delay):
    time.sleep(delay)
    return 'I slept asynchronously'

or use the aioify lib

% pip install aioify

then

@aioify
def sleep_async(delay):
    pass


回答3:

Not sure if too late but you can also use a decorator to do your function in a thread. ALTHOUGH, note that it will still be non-coop blocking unlike async which is co-op blocking.

def wrap(func):
    from concurrent.futures import ThreadPoolExecutor
    pool=ThreadPoolExecutor()
    @wraps(func)
    async def run(*args, loop=None, executor=None, **kwargs):
        if loop is None:
            loop = asyncio.get_event_loop()
        future=pool.submit(func, *args, **kwargs)
        return asyncio.wrap_future(future)
    return run

Hope that helps!