How to detect write failure in asyncio?

2019-03-28 06:03发布

问题:

As a simple example, consider the network equivalent of /dev/zero, below. (Or more realistically, just a web server sending a large file.)

If a client disconnects early, you get a barrage of log messages:

WARNING:asyncio:socket.send() raised exception.

But I'm not finding any way to catch said exception. The hypothetical server continues reading gigabytes from disk and sending them to a dead socket, with no effort on the client's part, and you've got yourself a DoS attack.

The only thing I've found from the docs is to yield from a read, with an empty string indicating closure. But that's no good here because a normal client isn't going to send anything, blocking the write loop.

What's the right way to detect failed writes, or be notified that the TCP connection has been closed, with the streams API or otherwise?

Code:

from asyncio import *
import logging

@coroutine
def client_handler(reader, writer):
    while True:
        writer.write(bytes(1))
        yield from writer.drain()

logging.basicConfig(level=logging.INFO)
loop = get_event_loop()
coro = start_server(client_handler, '', 12345)
server = loop.run_until_complete(coro)
loop.run_forever()

回答1:

This is a little bit strange, but you can actually allow an exception to reach the client_handler coroutine by forcing it to yield control to the event loop for one iteration:

import asyncio
import logging

@asyncio.coroutine
def client_handler(reader, writer):
    while True:
        writer.write(bytes(1))
        yield  # Yield to the event loop
        yield from writer.drain()

logging.basicConfig(level=logging.INFO)
loop = asyncio.get_event_loop()
coro = asyncio.start_server(client_handler, '', 12345)
server = loop.run_until_complete(coro)
loop.run_forever()

If I do that, I get this output when I kill the client connection:

ERROR:asyncio:Task exception was never retrieved
future: <Task finished coro=<client_handler() done, defined at aio.py:4> exception=ConnectionResetError(104, 'Connection reset by peer')>
Traceback (most recent call last):
  File "/usr/lib/python3.4/asyncio/tasks.py", line 238, in _step
    result = next(coro)
  File "aio.py", line 9, in client_handler
    yield from writer.drain()
  File "/usr/lib/python3.4/asyncio/streams.py", line 301, in drain
    raise exc
  File "/usr/lib/python3.4/asyncio/selector_events.py", line 700, in write
    n = self._sock.send(data)
ConnectionResetError: [Errno 104] Connection reset by peer

I'm really not quite sure why you need to explicitly let the event loop get control for the exception to get through - don't have time at the moment to dig into it. I assume some bit needs to get flipped to indicate the connection dropped, and calling yield from writer.drain() (which can short-circuit going through the event loop) in a loop is preventing that from happening, but I'm really not sure. If I get a chance to investigate, I'll update the answer with that info.



回答2:

I did some digging into the asyncio source to expand on dano's answer on why the exceptions aren't being raised without explicitly passing control to the event loop. Here's what I've found.

Calling yield from wirter.drain() gives the control over to the StreamWriter.drain coroutine. This coroutine checks for and raises any exceptions that that the StreamReaderProtocol set on the StreamReader. But since we passed control over to drain, the protocol hasn't had the chance to set the exception yet. drain then gives control over to the FlowControlMixin._drain_helper coroutine. This coroutine the returns immediately because some more flags haven't been set yet, and the control ends up back with the coroutine that called yield from wirter.drain().

And so we have gone full circle without giving control to the event loop to allow it handle other coroutines and bubble up the exceptions to writer.drain().

yielding before a drain() gives the transport/protocol a chance to set the appropriate flags and exceptions.

Here's a mock up of what's going on, with all the nested calls collapsed:

import asyncio as aio

def set_exception(ctx, exc):
  ctx["exc"] = exc

@aio.coroutine
def drain(ctx):
  if ctx["exc"] is not None:
    raise ctx["exc"]

  return

@aio.coroutine
def client_handler(ctx):
  i = 0
  while True:
    i += 1
    print("write", i)
    # yield # Uncommenting this allows the loop.call_later call to be scheduled.
    yield from drain(ctx)

CTX = {"exc": None}

loop = aio.get_event_loop()
# Set the exception in 5 seconds
loop.call_later(5, set_exception, CTX, Exception("connection lost"))
loop.run_until_complete(client_handler(CTX))
loop.close()

This should probably fixed upstream in the Streams API by the asyncio developers.



回答3:

The stream based API doesn't have a callback you can specify for when the connection is closed. But the Protocol API does, so use it instead: https://docs.python.org/3/library/asyncio-protocol.html#connection-callbacks