Parallel asynchronous IO in Python's coroutine

2019-02-22 15:47发布

问题:

Simple example: I need to make two unrelated HTTP requests in parallel. What's the simplest way to do that? I expect it to be like that:

async def do_the_job():
    with aiohttp.ClientSession() as session:
        coro_1 = session.get('http://httpbin.org/get')
        coro_2 = session.get('http://httpbin.org/ip')
        return combine_responses(await coro_1, await coro_2)

In other words, I want to initiate IO operations and wait for their results so they effectively run in parallel. This can be achieved with asyncio.gather:

async def do_the_job():
    with aiohttp.ClientSession() as session:
        coro_1 = session.get('http://example.com/get')
        coro_2 = session.get('http://example.org/tp')
        return combine_responses(*(await asyncio.gather(coro_1, coro_2)))

Next, I want to have some complex dependency structure. I want to start operations when I have all prerequisites for them and get results when I need the results. Here helps asyncio.ensure_future which makes separate task from coroutine which is managed by event loop separately:

async def do_the_job():
    with aiohttp.ClientSession() as session:
        fut_1 = asyncio.ensure_future(session.get('http://httpbin.org/ip'))
        coro_2 = session.get('http://httpbin.org/get')
        coro_3 = session.post('http://httpbin.org/post', data=(await coro_2)
        coro_3_result = await coro_3
        return combine_responses(await fut_1, coro_3_result)

Is it true that, to achieve parallel non-blocking IO with coroutines in my logic flow, I have to use either asyncio.ensure_future or asyncio.gather (which actually uses asyncio.ensure_future)? Is there a less "verbose" way?

Is it true that normally developers have to think what coroutines should become separate tasks and use aforementioned functions to gain optimal performance?

Is there a point in using coroutines without multiple tasks in event loop?

How "heavy" are event loop tasks in real life? Surely, they're "lighter" than OS threads or processes. To what extent should I strive for minimal possible number of such tasks?

回答1:

I need to make two unrelated HTTP requests in parallel. What's the simplest way to do that?

import asyncio
import aiohttp


async def request(url):
    async with aiohttp.ClientSession() as session:
        async with session.get(url) as resp:
            return await resp.text()


async def main():
    results = await asyncio.gather(
        request('http://httpbin.org/delay/1'),
        request('http://httpbin.org/delay/1'),
    )
    print(len(results))


loop = asyncio.get_event_loop()
try:
    loop.run_until_complete(main())
    loop.run_until_complete(loop.shutdown_asyncgens())
finally:
    loop.close()

Yes, you may achieve concurrency with asyncio.gather or creating task with asyncio.ensure_future.

Next, I want to have some complex dependency structure? I want to start operations when I have all prerequisites for them and get results when I need the results.

While code you provided will do job, it would be nicer to split concurrent flows on different coroutines and again use asyncio.gather:

import asyncio
import aiohttp


async def request(url):
    async with aiohttp.ClientSession() as session:
        async with session.get(url) as resp:
            return await resp.text()


async def get_ip():
    return await request('http://httpbin.org/ip')


async def post_from_get():
    async with aiohttp.ClientSession() as session:
        async with session.get('http://httpbin.org/get') as resp:
            get_res = await resp.text()
        async with session.post('http://httpbin.org/post', data=get_res) as resp:
            return await resp.text()


async def main():
    results = await asyncio.gather(
        get_ip(),
        post_from_get(),
    )
    print(len(results))


loop = asyncio.get_event_loop()
try:
    loop.run_until_complete(main())
    loop.run_until_complete(loop.shutdown_asyncgens())
finally:
    loop.close()

Is it true that normally developers have to think what coroutines should become separate tasks and use aforementioned functions to gain optimal performance?

Since you use asyncio you probably want to run some jobs concurrently to gain performance, right? asyncio.gather is a way to say - "run these jobs concurrently to get their results faster".

In case you shouldn't have to think what jobs should be ran concurrently to gain performance you may be ok with plain sync code.

Is there a point in using coroutines without multiple tasks in event loop?

In your code you don't have to create tasks manually if you don't want it: both snippets in this answer don't use asyncio.ensure_future. But internally asyncio uses tasks constantly (for example, as you noted asyncio.gather uses tasks itself).

How "heavy" are event loop tasks in real life? Surely, they're "lighter" than OS threads or processes. To what extent should I strive for minimal possible number of such tasks?

Main bottleneck in async program is (almost always) network: you shouldn't worry about number of asyncio coroutines/tasks at all.