Timeout for python coroutines

2019-02-15 17:35发布

问题:

How can I make a coroutine stop with timeout?

I don't understand why asyncio.wait_for() doesn't work for me. I have such peace of code (planning to make my implementation of telnet client):

def expect(self, pattern, timeout=20): 
    if type(pattern) == str:
        pattern = pattern.encode('ascii', 'ignore')        
    return self.loop.run_until_complete(asyncio.wait_for(self.asyncxpect(pattern), timeout))

async def asyncxpect(self, pattern): #receives data in a cumulative way until match is found
    regexp = re.compile(b'(?P<payload>[\s\S]*)(?P<pattern>%s)' %pattern)
    self.buffer = b''
    while True:
        # add timeout
        # add exception handling for unexpectedly closed connections
        data = await self.loop.sock_recv(self.sock, 10000) 
        self.buffer += data
        m = re.match(regexp, self.buffer)
        if m:
            payload = m.group('payload')
            match = m.group('pattern')
            return payload, match 

As I thought this code, at some point (in await statement) returns control to event loop. I thought it should happen when there is no more data to receive. And if event loop has control, it can stop with timeout.

But if server doesn't send anything useful (that matched) my code just stumbles in this loop, right at await point.

I think it is different from this problem Python asyncio force timeout, because I'm not using blocking statements like time.sleep(n).

Here is my code

回答1:

When the server closes the connection, sock_recv returns an empty bytearray (b''), indicating end of file. Since you don't handle that condition, your code ends up stuck in an infinite loop processing the same buffer.

To correct it, add something like:

if data == b'':
    break

...after the data = await loop.sock_recv(...) line.

But the above still doesn't explain why wait_for is unable to cancel the rogue coroutine. The problem is that await doesn't mean "pass control to the event loop", as it is sometimes depicted. It means "request value from the provided awaitable object, yielding control to the event loop if the object indicates that it does not have a value ready." The if is crucial: if the object does have a value ready, this value will be used immediately without ever deferring to the event loop. In other words, await doesn't guarantee that the event loop will get a chance to run.

For example, the following coroutine completely blocks the event loop and prevents any other coroutine from ever running, despite its inner loop consisting of nothing but awaiting:

async def busy_loop():
    while True:
        await noop()

async def noop():
    pass

In your example, since the socket does not block at all when it is at end-of-file, the coroutine is never suspended, and (in collusion with the above bug) your coroutine never exits.

To guarantee that other tasks get a chance to run, you can add await asyncio.sleep(0) in a loop. This should not be necessary for most code, where requesting IO data will soon result in a wait, at which point the event loop will kick in. It is only in combination with the EOF-handling bug that the code gets stuck.