Python asyncio force timeout

2020-02-28 07:38发布

问题:

Using asyncio a coroutine can be executed with a timeout so it gets cancelled after the timeout:

@asyncio.coroutine
def coro():
    yield from asyncio.sleep(10)

loop = asyncio.get_event_loop()
loop.run_until_complete(asyncio.wait_for(coro(), 5))

The above example works as expected (it times out after 5 seconds).

However, when the coroutine doesn't use asyncio.sleep() (or other asyncio coroutines) it doesn't seem to time out. Example:

@asyncio.coroutine
def coro():
    import time
    time.sleep(10)

loop = asyncio.get_event_loop()
loop.run_until_complete(asyncio.wait_for(coro(), 1))

This takes more than 10 seconds to run because the time.sleep(10) isn't cancelled. Is it possible to enforce the cancellation of the coroutine in such a case?

If asyncio should be used to solve this, how could I do that?

回答1:

No, you can't interrupt a coroutine unless it yields control back to the event loop, which means it needs to be inside a yield from call. asyncio is single-threaded, so when you're blocking on the time.sleep(10) call in your second example, there's no way for the event loop to run. That means when the timeout you set using wait_for expires, the event loop won't be able to take action on it. The event loop doesn't get an opportunity to run again until coro exits, at which point its too late.

This is why in general, you should always avoid any blocking calls that aren't asynchronous; any time a call blocks without yielding to the event loop, nothing else in your program can execute, which is probably not what you want. If you really need to do a long, blocking operation, you should try to use BaseEventLoop.run_in_executor to run it in a thread or process pool, which will avoid blocking the event loop:

import asyncio
import time
from concurrent.futures import ProcessPoolExecutor

@asyncio.coroutine
def coro(loop):
    ex = ProcessPoolExecutor(2)
    yield from loop.run_in_executor(ex, time.sleep, 10)  # This can be interrupted.

loop = asyncio.get_event_loop()
loop.run_until_complete(asyncio.wait_for(coro(loop), 1))


回答2:

Thx @dano for your answer. If running a coroutine is not a hard requirement, here is a reworked, more compact version

import asyncio, time, concurrent

timeout = 0.5
loop = asyncio.get_event_loop()
future = asyncio.wait_for(loop.run_in_executor(None, time.sleep, 2), timeout)
try:
    loop.run_until_complete(future)
    print('Thx for letting me sleep')
except concurrent.futures.TimeoutError:
    print('I need more sleep !')

For the curious, a little debugging in my Python 3.5.2 showed that passing None as an executor results in the creation of a _default_executor, as follows:

# _MAX_WORKERS = 5
self._default_executor = concurrent.futures.ThreadPoolExecutor(_MAX_WORKERS)


回答3:

The examples I've seen for timeout handling are very trivial. Given reality, my app is bit more complex. The sequence is:

  1. When a client connects to server, have the server create another connection to internal server
  2. When the internal server connection is ok, wait for the client to send data. Based on this data we may make a query to internal server.
  3. When there is data to send to internal server, send it. Since internal server sometimes doesn't respond fast enough, wrap this request into a timeout.
  4. If the operation times out, collapse all connections to signal the client about error

To achieve all of the above, while keeping the event loop running, the resulting code contains following code:

def connection_made(self, transport):
    self.client_lock_coro = self.client_lock.acquire()
    asyncio.ensure_future(self.client_lock_coro).add_done_callback(self._got_client_lock)

def _got_client_lock(self, task):
    task.result() # True at this point, but call there will trigger any exceptions
    coro = self.loop.create_connection(lambda: ClientProtocol(self),
                                           self.connect_info[0], self.connect_info[1])
    asyncio.ensure_future(asyncio.wait_for(coro,
                                           self.client_connect_timeout
                                           )).add_done_callback(self.connected_server)

def connected_server(self, task):
    transport, client_object = task.result()
    self.client_transport = transport
    self.client_lock.release()

def data_received(self, data_in):
    asyncio.ensure_future(self.send_to_real_server(message, self.client_send_timeout))

def send_to_real_server(self, message, timeout=5.0):
    yield from self.client_lock.acquire()
    asyncio.ensure_future(asyncio.wait_for(self._send_to_real_server(message),
                                                   timeout, loop=self.loop)
                                  ).add_done_callback(self.sent_to_real_server)

@asyncio.coroutine
def _send_to_real_server(self, message):
    self.client_transport.write(message)

def sent_to_real_server(self, task):
    task.result()
    self.client_lock.release()