How to know when StreamReader is ready?

2019-08-02 09:26发布

I'm using asyncio to make TCP connections:

reader, writer = await asyncio.open_connection(addr)

I need to keep connections alive. For this, I'm storing a pair of (reader, writer) for future communications. However, I don't know when reader has data to read. What can I do with it? Is there a way to make a handler, when the reader is ready?

1条回答
可以哭但决不认输i
2楼-- · 2019-08-02 10:24

However, I don't know when reader has data to read. What can I do with it?

The obvious way to know when a reader stream has data to read is to await it:

data = await reader.read(1024)

This will either return the data right away, or suspend the current coroutine, allowing other coroutines to make progress, and only resuming this one when the reader has some data to read. Instead of storing the reader/writer for future communication, you can write a coroutine that does the communication, and store the task that drives it:

async def communicate():
    reader, writer = await asyncio.open_connection(addr)
    # an echo server
    while True:
        line = await reader.readline()
        if not line:
            break
        writer.write(line)
        await writer.drain()  # backpressure, see https://tinyurl.com./hqylfay

task = loop.create_task(communicate())
# the task can itself be awaited, canceled, etc.

The idea behind the asyncio stream API is to write such sequential-looking code, leaving it to asyncio to handle polling of file descriptors and scheduling of tasks. You can use combinators like asyncio.gather and asyncio.wait to run thousands of such lightweight coroutines in parallel.

Is there a way to make a handler, when the reader is ready?

If you need a callback-based API, you should probably use the lower-level transports and protocols instead. However, if you are already working with streams, but still occasionally need an ordinary callback, you can get it by obtaining a Future:

future = asyncio.ensure_future(reader.read(1024))
future.add_done_callback(your_callback)

Future has the role equivalent to a coroutine handler. Once read would no longer block, the done-callback will be invoked by the event loop with a single argument, the future. The future will have finished, and its result() method can be used to retrieve the received data or an exception.

(The above applies to any coroutine or future-compatible object in asyncio, not just to StreamReader methods.)

查看更多
登录 后发表回答