How to combine Celery with asyncio?

2019-02-08 06:23发布

问题:

How can I create a wrapper that makes celery tasks look like asyncio.Task? Or is there a better way to integrate Celery with asyncio?

@asksol, the creator of Celery, said this::

It's quite common to use Celery as a distributed layer on top of async I/O frameworks (top tip: routing CPU-bound tasks to a prefork worker means they will not block your event loop).

But I could not find any code examples specifically for asyncio framework.

回答1:

That will be possible from Celery version 5.0 as stated in the official site:

http://docs.celeryproject.org/en/4.0/whatsnew-4.0.html#preface

  1. The next major version of Celery will support Python 3.5 only, were we are planning to take advantage of the new asyncio library.
  2. Dropping support for Python 2 will enable us to remove massive amounts of compatibility code, and going with Python 3.5 allows us to take advantage of typing, async/await, asyncio, and similar concepts there’s no alternative for in older versions.

The above were quoted from the previous link.

So the best thing to do is wait for version 5.0 to be distributed!

In the mean time, happy coding :)



回答2:

You can wrap any blocking call into a Task using run_in_executor as described in documentation, I also added in the example a custom timeout:

def run_async_task(
    target,
    *args,
    timeout = 60,
    **keywords
) -> Future:
    loop = asyncio.get_event_loop()
    return asyncio.wait_for(
        loop.run_in_executor(
            executor,
            functools.partial(target, *args, **keywords)
        ),
        timeout=timeout,
        loop=loop
    )
loop = asyncio.get_event_loop()
async_result = loop.run_until_complete(
    run_async_task, your_task.delay, some_arg, some_karg="" 
)
result = loop.run_until_complete(
    run_async_task, async_result.result 
)


回答3:

The cleanest way I've found to do this is to wrap the async function in asgiref.sync.async_to_sync (from asgiref):

from asgiref.sync import async_to_sync
from celery.task import periodic_task


async def return_hello():
    await sleep(1)
    return 'hello'


@periodic_task(
    run_every=2,
    name='return_hello',
)
def task_return_hello():
    async_to_sync(return_hello)()

I pulled this example from a blog post I wrote.