Please explain “Task was destroyed but it is pendi

2019-04-19 07:35发布

问题:

Python 3.4.2

I am learning asyncio and I use it to continously listen IPC bus, while gbulb listens to the dbus.

Some side notes:

So I created a function listen_to_ipc_channel_layer that continously listens for incoming messages on the IPC channel and passes the message to a message_handler.

I am also listening to SIGTERM and SIGINT. So when I send a SIGTERM to the python process running the code you find at the bottom, the script should terminate gracefully.

The problem

… I am having is the following warning:

got signal 15: exit
Task was destroyed but it is pending!
task: <Task pending coro=<listen_to_ipc_channel_layer() running at /opt/mainloop-test.py:23> wait_for=<Future cancelled>>

Process finished with exit code 0

… with the following code:

import asyncio
import gbulb
import signal
import asgi_ipc as asgi

def main():
    asyncio.async(listen_to_ipc_channel_layer())
    loop = asyncio.get_event_loop()

    for sig in (signal.SIGINT, signal.SIGTERM):
        loop.add_signal_handler(sig, ask_exit)

    # Start listening on the Linux IPC bus for incoming messages
    loop.run_forever()
    loop.close()

@asyncio.coroutine
def listen_to_ipc_channel_layer():
    """Listens to the Linux IPC bus for messages"""
    while True:
        message_handler(message=channel_layer.receive(["my_channel"]))
        try:
            yield from asyncio.sleep(0.1)
        except asyncio.CancelledError:
            break

def ask_exit():
    loop = asyncio.get_event_loop()
    for task in asyncio.Task.all_tasks():
        task.cancel()
    loop.stop()


if __name__ == "__main__":
    gbulb.install()
    # Connect to the IPC bus
    channel_layer = asgi.IPCChannelLayer(prefix="my_channel")
    main()

I still only understand very little of asyncio, but I think I know what is going on. While waiting for yield from asyncio.sleep(0.1) the signal handler caught the SIGTERM and in that process it calls task.cancel().

Question thrown in: Shouldn't this trigger the CancelledError within the while True: loop? (Because it is not, but that is how I understand "Calling cancel() will throw a CancelledError to the wrapped coroutine").

Eventually loop.stop() is called which stops the loop without waiting for either yield from asyncio.sleep(0.1) to return a result or even the whole coroutine listen_to_ipc_channel_layer.

Please correct me if I am wrong.

I think the only thing I need to do is to make my program wait for the yield from asyncio.sleep(0.1) to return a result and/or coroutine to break out the while-loop and finish.

I believe I confuse a lot of things. Please help me get those things straight so that I can figure out how to gracefully close the event loop without warning.

回答1:

The problem comes from closing the loop immediately after cancelling the tasks. As the cancel() docs state

"This arranges for a CancelledError to be thrown into the wrapped coroutine on the next cycle through the event loop."

Take this snippet of code:

import asyncio
import signal


async def pending_doom():
    await asyncio.sleep(2)
    print(">> Cancelling tasks now")
    for task in asyncio.Task.all_tasks():
        task.cancel()

    print(">> Done cancelling tasks")
    asyncio.get_event_loop().stop()


def ask_exit():
    for task in asyncio.Task.all_tasks():
        task.cancel()


async def looping_coro():
    print("Executing coroutine")
    while True:
        try:
            await asyncio.sleep(0.25)
        except asyncio.CancelledError:
            print("Got CancelledError")
            break

        print("Done waiting")

    print("Done executing coroutine")
    asyncio.get_event_loop().stop()


def main():
    asyncio.async(pending_doom())
    asyncio.async(looping_coro())

    loop = asyncio.get_event_loop()
    for sig in (signal.SIGINT, signal.SIGTERM):
        loop.add_signal_handler(sig, ask_exit)

    loop.run_forever()

    # I had to manually remove the handlers to
    # avoid an exception on BaseEventLoop.__del__
    for sig in (signal.SIGINT, signal.SIGTERM):
        loop.remove_signal_handler(sig)


if __name__ == '__main__':
    main()

Notice ask_exit cancels the tasks but does not stop the loop, on the next cycle looping_coro() stops it. The output if you cancel it is:

Executing coroutine
Done waiting
Done waiting
Done waiting
Done waiting
^CGot CancelledError
Done executing coroutine

Notice how pending_doom cancels and stops the loop immediately after. If you let it run until the pending_doom coroutines awakes from the sleep you can see the same warning you're getting:

Executing coroutine
Done waiting
Done waiting
Done waiting
Done waiting
Done waiting
Done waiting
Done waiting
>> Cancelling tasks now
>> Done cancelling tasks
Task was destroyed but it is pending!
task: <Task pending coro=<looping_coro() running at canceling_coroutines.py:24> wait_for=<Future cancelled>>


回答2:

The meaning of the issue is that a loop doesn't have time to finish all the tasks.

This arranges for a CancelledError to be thrown into the wrapped coroutine on the next cycle through the event loop.

There is no chance to do a "next cycle" of the loop in your approach. To make it properly you should move a stop operation to a separate non-cyclic coroutine to give your loop a chance to finish.

Second significant thing is CancelledError raising.

Unlike Future.cancel(), this does not guarantee that the task will be cancelled: the exception might be caught and acted upon, delaying cancellation of the task or preventing cancellation completely. The task may also return a value or raise a different exception.

Immediately after this method is called, cancelled() will not return True (unless the task was already cancelled). A task will be marked as cancelled when the wrapped coroutine terminates with a CancelledError exception (even if cancel() was not called).

So after cleanup your coroutine must raise CancelledError to be marked as cancelled.

Using an extra coroutine to stop the loop is not an issue because it is not cyclic and be done immediately after execution.

def main():                                              
    loop = asyncio.get_event_loop()                      
    asyncio.ensure_future(listen_to_ipc_channel_layer()) 

    for sig in (signal.SIGINT, signal.SIGTERM):          
        loop.add_signal_handler(sig, ask_exit)           
    loop.run_forever()                                   
    print("Close")                                       
    loop.close()                                         


@asyncio.coroutine                                       
def listen_to_ipc_channel_layer():                       
    while True:                                          
        try:                                             
            print("Running")                                 
            yield from asyncio.sleep(0.1)                
        except asyncio.CancelledError as e:              
            print("Break it out")                        
            raise e # Raise a proper error


# Stop the loop concurrently           
@asyncio.coroutine                                       
def exit():                                              
    loop = asyncio.get_event_loop()                      
    print("Stop")                                        
    loop.stop()                                          


def ask_exit():                          
    for task in asyncio.Task.all_tasks():
        task.cancel()                    
    asyncio.ensure_future(exit())        


if __name__ == "__main__":               
    main()