how to add a coroutine to a running asyncio loop?

2019-03-17 21:48发布

How can one add a new coroutine to a running asyncio loop? Ie. one that is already executing a set of coroutines.

I guess as a workaround one could wait for existing coroutines to complete and then initialize a new loop (with the additional coroutine). But is there a better way?

4条回答
Bombasti
2楼-- · 2019-03-17 22:02

To add a function to an already running event loop you can use:

asyncio.ensure_future(my_coro())

In my case I was using multithreading (threading) alongside asyncio and wanted to add a task to the event loop that was already running. For anyone else in the same situation, be sure to explicitly state the event loop (as one doesn't exist inside a Thread). i.e:

In global scope:

event_loop = asyncio.get_event_loop()

Then later, inside your Thread:

asyncio.ensure_future(my_coro(), loop=event_loop)
查看更多
我想做一个坏孩纸
3楼-- · 2019-03-17 22:03

You can use create_task for scheduling new coroutines:

import asyncio

async def cor1():
    ...

async def cor2():
    ...

async def main(loop):
    await asyncio.sleep(0)
    t1 = loop.create_task(cor1())
    await cor2()
    await t1

loop = asyncio.get_event_loop()
loop.run_until_complete(main(loop))
loop.close()
查看更多
forever°为你锁心
4楼-- · 2019-03-17 22:06

Your question is very close to "How to add function call to running program?"

When exactly you need to add new coroutine to event loop?

Let's see some examples. Here program that starts event loop with two coroutines parallely:

import asyncio
from random import randint


async def coro1():
    res = randint(0,3)
    await asyncio.sleep(res)
    print('coro1 finished with output {}'.format(res))
    return res

async def main():
    await asyncio.gather(
        coro1(),
        coro1()
    ) # here we have two coroutines running parallely

if __name__ == "__main__":
    loop = asyncio.get_event_loop()
    loop.run_until_complete(main())

Output:

coro1 finished with output 1
coro1 finished with output 2
[Finished in 2.2s]

May be you need to add some coroutines that would take results of coro1 and use it as soon as it's ready? In that case just create coroutine that await coro1 and use it's returning value:

import asyncio
from random import randint


async def coro1():
    res = randint(0,3)
    await asyncio.sleep(res)
    print('coro1 finished with output {}'.format(res))
    return res

async def coro2():
    res = await coro1()
    res = res * res
    await asyncio.sleep(res)
    print('coro2 finished with output {}'.format(res))
    return res

async def main():
    await asyncio.gather(
        coro2(),
        coro2()
    ) # here we have two coroutines running parallely

if __name__ == "__main__":
    loop = asyncio.get_event_loop()
    loop.run_until_complete(main())

Output:

coro1 finished with output 1
coro2 finished with output 1
coro1 finished with output 3
coro2 finished with output 9
[Finished in 12.2s]

Think about coroutines as about regular functions with specific syntax. You can start some set of functions to execute parallely (by asyncio.gather), you can start next function after first done, you can create new functions that call others.

查看更多
\"骚年 ilove
5楼-- · 2019-03-17 22:21

None of the answers here seem to exactly answer the question. It is possible to add tasks to a running event loop by having a "parent" task do it for you. I'm not sure what the most pythonic way to make sure that parent doesn't end until it's children have all finished (assuming that's the behavior you want), but this does work.

import asyncio
import random


async def add_event(n):
    print('starting ' + str(n))
    await asyncio.sleep(n)
    print('ending ' + str(n))
    return n


async def main(loop):

    added_tasks = []

    delays = [x for x in range(5)]

    # shuffle to simulate unknown run times
    random.shuffle(delays)

    for n in delays:
        print('adding ' + str(n))
        task = loop.create_task(add_event(n))
        added_tasks.append(task)
        await asyncio.sleep(0)

    print('done adding tasks')

    # make a list of tasks that (maybe) haven't completed
    running_tasks = added_tasks[::]

    # wait until we see that all tasks have completed
    while running_tasks:
        running_tasks = [x for x in running_tasks if not x.done()]
        await asyncio.sleep(0)

    print('done running tasks')

    # extract the results from the tasks and return them
    results = [x.result() for x in added_tasks]
    return results


loop = asyncio.get_event_loop()
results = loop.run_until_complete(main(loop))
loop.close()
print(results)
查看更多
登录 后发表回答