How can I create a wrapper that makes celery tasks look like asyncio.Task
? Or is there a better way to integrate Celery with asyncio
?
@asksol, the creator of Celery, said this::
It's quite common to use Celery as a distributed layer on top of async I/O frameworks (top tip: routing CPU-bound tasks to a prefork worker means they will not block your event loop).
But I could not find any code examples specifically for asyncio
framework.
That will be possible from Celery version 5.0 as stated in the official site:
http://docs.celeryproject.org/en/4.0/whatsnew-4.0.html#preface
- The next major version of Celery will support Python 3.5 only, were we are planning to take advantage of the new asyncio library.
- Dropping support for Python 2 will enable us to remove massive amounts of compatibility code, and going with Python 3.5 allows us to take advantage of typing, async/await, asyncio, and similar concepts there’s no alternative for in older versions.
The above were quoted from the previous link.
So the best thing to do is wait for version 5.0 to be distributed!
In the mean time, happy coding :)
You can wrap any blocking call into a Task using run_in_executor
as described in documentation, I also added in the example a custom timeout:
def run_async_task(
target,
*args,
timeout = 60,
**keywords
) -> Future:
loop = asyncio.get_event_loop()
return asyncio.wait_for(
loop.run_in_executor(
executor,
functools.partial(target, *args, **keywords)
),
timeout=timeout,
loop=loop
)
loop = asyncio.get_event_loop()
async_result = loop.run_until_complete(
run_async_task, your_task.delay, some_arg, some_karg=""
)
result = loop.run_until_complete(
run_async_task, async_result.result
)
The cleanest way I've found to do this is to wrap the async
function in asgiref.sync.async_to_sync
(from asgiref
):
from asgiref.sync import async_to_sync
from celery.task import periodic_task
async def return_hello():
await sleep(1)
return 'hello'
@periodic_task(
run_every=2,
name='return_hello',
)
def task_return_hello():
async_to_sync(return_hello)()
I pulled this example from a blog post I wrote.