How to implement timeout in asyncio server?

2020-07-22 09:42发布

问题:

Below is a simple echo server. But if the client does not send anything for 10 seconds, I want to close the connection.

import asyncio


async def process(reader: asyncio.StreamReader, writer: asyncio.StreamWriter):
    print("awaiting for data")
    line = await reader.readline()
    print(f"received {line}")
    writer.write(line)
    print(f"sent {line}")
    await writer.drain()
    print(f"Drained")


async def timeout(task: asyncio.Task, duration):
    print("timeout started")
    await asyncio.sleep(duration)
    print("client unresponsive, cancelling")
    task.cancel()
    print("task cancelled")


async def new_session(reader, writer):
    print("new session started")
    task = asyncio.create_task(process(reader, writer))
    timer = asyncio.create_task(timeout(task, 10))
    await task
    print("task complete")
    timer.cancel()
    print("timer cancelled")
    writer.close()
    print("writer closed")


async def a_main():
    server = await asyncio.start_server(new_session, port=8088)
    await server.serve_forever()


if __name__ == '__main__':
    asyncio.run(a_main())

If the client sends a message, it works fine. But the other case, when client is silent, it does not work

When client sends message:

new session started
awaiting for data
timeout started
received b'slkdfjsdlkfj\r\n'
sent b'slkdfjsdlkfj\r\n'
Drained
task complete
timer cancelled
writer closed

When client is silent after opening connection

new session started
awaiting for data
timeout started
client unresponsive, cancelling
task cancelled

There is no task complete, timer cancelled, writer closed.

  1. What is the issue with above code?
  2. Is there a better way to implement timeouts?

Update

Figured out the problem, Looks like the task was actually cancelled, but the exception got silently ignored, Fixed the problem by catching CancelledError

async def new_session(reader, writer):
    print("new session started")
    task = asyncio.create_task(process(reader, writer))
    timer = asyncio.create_task(timeout(task, 10))
    try:
        await task
    except asyncio.CancelledError:
        print(f"Task took too long and was cancelled by timer")
    print("task complete")
    timer.cancel()
    print("timer cancelled")
    writer.close()
    print("writer closed")

Second part still remains. Is there a better way to implement timeouts?


Update2

Complete code using wait_for. The timeout code is no longer needed. Check accepted solution below:

async def new_session(reader, writer):
    print("new session started")
    try:
        await asyncio.wait_for(process(reader, writer), timeout=5)
    except asyncio.TimeoutError as te:
        print(f'time is up!{te}')
    finally:
        writer.close()
        print("writer closed")

回答1:

Is there a better way to implement timeouts?

You can use asyncio.wait_for instead of timeout. It has similar semantics, but already comes with asyncio. Also, you can await the future it returns to detect if the timeout has occurred.



回答2:

I use the following code when making a connection. I'd suggest using wait_for similarly for your code.

fut = asyncio.open_connection( self.host, self.port, loop=self.loop )
try:
   r, w = await asyncio.wait_for(fut, timeout=self.connection_timeout)
except asyncio.TimeoutError:
   pass