How to schedule and cancel tasks with asyncio

2020-03-02 04:04发布

I am writing a client-server application. While connected, client sends to the server a "heartbeat" signal, for example, every second. On the server-side I need a mechanism where I can add tasks (or coroutines or something else) to be executed asynchronously. Moreover, I want to cancel tasks from a client, when it stops sending that "heartbeat" signal.

In other words, when the server starts a task it has kind of timeout or ttl, in example 3 seconds. When the server receives the "heartbeat" signal it resets timer for another 3 seconds until task is done or client disconnected (stops send the signal).

Here is an example of canceling a task from asyncio tutorial on pymotw.com. But here the task is canceled before the event_loop started, which is not suitable for me.

import asyncio

async def task_func():
    print('in task_func')
    return 'the result'


event_loop = asyncio.get_event_loop()
try:
    print('creating task')
    task = event_loop.create_task(task_func())

    print('canceling task')
    task.cancel()

    print('entering event loop')
    event_loop.run_until_complete(task)
    print('task: {!r}'.format(task))
except asyncio.CancelledError:
    print('caught error from cancelled task')
else:
    print('task result: {!r}'.format(task.result()))
finally:
    event_loop.close()

1条回答
男人必须洒脱
2楼-- · 2020-03-02 04:40

You can use asyncio Task wrappers to execute a task via the ensure_future() method.

ensure_future will automatically wrap your coroutine in a Task wrapper and attach it to your event loop. The Task wrapper will then also ensure that the coroutine 'cranks-over' from await to await statement (or until the coroutine finishes).

In other words, just pass a regular coroutine to ensure_future and assign the resultant Task object to a variable. You can then call Task.cancel() when you need to stop it.

import asyncio

async def task_func():
    print('in task_func')
    # if the task needs to run for a while you'll need an await statement
    # to provide a pause point so that other coroutines can run in the mean time
    await some_db_or_long_running_background_coroutine()
    # or if this is a once-off thing, then return the result,
    # but then you don't really need a Task wrapper...
    # return 'the result'

async def my_app():
    my_task = None
    while True:
        await asyncio.sleep(0)

        # listen for trigger / heartbeat
        if heartbeat and my_task is None:
            my_task = asyncio.ensure_future(task_func())

        # also listen for termination of hearbeat / connection
        elif not heartbeat and my_task:
            if not my_task.cancelled():
                my_task.cancel()
            else:
                my_task = None

run_app = asyncio.ensure_future(my_app())
event_loop = asyncio.get_event_loop()
event_loop.run_forever()

Note that tasks are meant for long-running tasks that need to keep working in the background without interrupting the main flow. If all you need is a quick once-off method, then just call the function directly instead.

查看更多
登录 后发表回答