I'm using asyncio
to make TCP connections:
reader, writer = await asyncio.open_connection(addr)
I need to keep connections alive. For this, I'm storing a pair of (reader, writer)
for future communications. However, I don't know when reader
has data to read. What can I do with it? Is there a way to make a handler, when the reader is ready?
However, I don't know when reader
has data to read. What can I do with it?
The obvious way to know when a reader stream has data to read is to await
it:
data = await reader.read(1024)
This will either return the data right away, or suspend the current coroutine, allowing other coroutines to make progress, and only resuming this one when the reader has some data to read. Instead of storing the reader/writer for future communication, you can write a coroutine that does the communication, and store the task that drives it:
async def communicate():
reader, writer = await asyncio.open_connection(addr)
# an echo server
while True:
line = await reader.readline()
if not line:
break
writer.write(line)
await writer.drain() # backpressure, see https://tinyurl.com./hqylfay
task = loop.create_task(communicate())
# the task can itself be awaited, canceled, etc.
The idea behind the asyncio stream API is to write such sequential-looking code, leaving it to asyncio to handle polling of file descriptors and scheduling of tasks. You can use combinators like asyncio.gather
and asyncio.wait
to run thousands of such lightweight coroutines in parallel.
Is there a way to make a handler, when the reader is ready?
If you need a callback-based API, you should probably use the lower-level transports and protocols instead. However, if you are already working with streams, but still occasionally need an ordinary callback, you can get it by obtaining a Future
:
future = asyncio.ensure_future(reader.read(1024))
future.add_done_callback(your_callback)
Future has the role equivalent to a coroutine handler. Once read
would no longer block, the done-callback will be invoked by the event loop with a single argument, the future. The future will have finished, and its result()
method can be used to retrieve the received data or an exception.
(The above applies to any coroutine or future-compatible object in asyncio, not just to StreamReader
methods.)