Waiting for a task to complete after KeyboardInter

2020-07-27 06:32发布

问题:

I'm trying to understand how does asyncio work. In my scenario client makes a tcp connection to the server, sends a login string, if authenticated - receives a stream of chars. Finally on KeyboardInterrupt sends logoff string to the server and hapilly disconnects.

Currently I'm stuck on the final part as my logoff method/task is destroyed before it has a chance to complete.

^CTask was destroyed but it is pending!
source_traceback: Object created at (most recent call last):
  File "tst.py", line 101, in <module>
    client.login()
  File "tst.py", line 29, in login
    logoff_tsk = self.loop.create_task(self.logoff())
task: <Task pending coro=<logoff() running at tst.py:49> cb=[myClass._shutdown()] created at tst.py:29>

Below is the code which produces this error:

from functools import partial
import asyncio

class myClass:
    def __init__(self, *a, **kw):
        self.transport = None
        self.protocol = None
        self.usr = str(kw.get("usr", ""))
        self.pwd = str(kw.get("pwd", ""))
        self.host = str(kw.get("host", "")) or "127.0.0.2"
        self.port = int(kw.get("port", 0)) or 5038
        self.loop = asyncio.get_event_loop()
        self.loop.set_debug(enabled=True)

    def reactor(self, recv):
        print("## ~/~ From reactor: {!r}".format(recv.decode()))

    def login(self):
        connection_tsk = self.loop.create_task(self.loop.create_connection(
            partial(
                myProtocol,
                reactor_func=self.reactor),
            host=self.host,
            port=self.port))
        connection_tsk.add_done_callback(self.set_transport_protocol)
        try:
            self.loop.run_forever()
        except KeyboardInterrupt:
            logoff_tsk = self.loop.create_task(self.logoff())
            logoff_tsk.add_done_callback(self._shutdown)

    def set_transport_protocol(self, fut):
        print("AmiCtl.set_transport_protocol")
        transport, protocol = fut.result()
        self.transport = transport
        self.protocol = protocol
        self.loop.call_soon(self._login)

    def _login(self):
        login_string = self.cmd("Login")
        self.loop.create_task(self.transmit(login_string))

    @asyncio.coroutine
    def transmit(self, cmd):
        if self.transport:
            print("## ~/~ Sending data: {!r}".format(cmd))
            self.transport.write(cmd)

    @asyncio.coroutine
    def logoff(self):
        command = self.cmd("Logoff")
        yield from asyncio.shield(self.transmit(command))

    def _shutdown(self):
        if self.transport:
            self.transport.close()
        self.loop.stop()
        self.loop.close()
        print("\n{!r}".format(self.loop))

    def cmd(self, action):
        """
        Produce login/logoff string.
        """

class myProtocol(asyncio.Protocol):
    def __init__(self, reactor_func=None):
        self.reactor_func = reactor_func
        self.loop = asyncio.get_event_loop()

    def connection_made(self, transport):
        self.transport = transport
        peername = transport.get_extra_info("peername")
        print("## ~/~ Connection made: {peer}".format(peer=peername))

    def data_received(self, data):
        if callable(self.reactor_func):
            self.reactor_func(data)

    def connection_lost(self, exc):
        print("## ~/~ Lost connection to the server!!")
        self.loop.stop()


client = myClass(usr="my_usr", pwd="my_pwd")
client.login()

How can I improve/fix my code please?

回答1:

I will not refer to you code as I think your question is a xy-question. (http://xyproblem.info), But I will try to answer in a more generic way. What I am reading is that you are asking how to correctly close down an asyncio application.

I have set up a small example that I think with some explanation will get you on the right track. Read the code below, my hope is that it can relate to your use case and I will explain below.

import asyncio
import signal

loop = asyncio.get_event_loop()


class Connector:

    def __init__(self):
        self.closing = False
        self.closed = asyncio.Future()
        task = loop.create_task(self.connection_with_client())
        task.add_done_callback(self.closed.set_result)


    async def connection_with_client(self):
        while not self.closing:
            print('Read/write to open connection')
            await asyncio.sleep(1)

        print('I will now close connection')
        await asyncio.sleep(1)


conn = Connector()


def stop(loop):
    conn.closing = True
    print("from here I will wait until connection_with_client has finished")
    conn.closed.add_done_callback(lambda _: loop.stop())

loop.add_signal_handler(signal.SIGINT, stop, loop)
loop.run_forever()
loop.close()

What you are asking for is actually not trivial, one of the hardest things to manage doing asyncio is correctly closing an asyncio application.

To do managed closing you have to have a coroutine in place that will handle shutdown and set af future as done, when it has made sure that everything is shutdown. In my example it is task.add_done_callback(self.closed.set_result) but this future could be set in other ways.

Second, you have to add a signal handler that will run a non-asynchronously (normal) function and schedule a callback, that triggers the loop to close/stop when your 'close future' is done.

Take a look at my code and toy around with it until you understand the flow. I dont blame you for asking, in my opinion one of the hardest things doing asyncio is to keep track of 'loose' coroutines, that will result in unclean shutdown.

When I did this 'exercise' my first time and needed to understand the principles I went though the code of https://github.com/aio-libs/aioredis/blob/master/aioredis/connection.py#L93-L95 If you take the time to read the code these guys handle your exact problem in a very beautiful way, I know the code is a but complex, but take a mug of coffee and follow the method calls around until it makes sense.

I hope this helps you, post more details or comments if I am unclear. And take you time to understand this topic (the asycnio shutdown things). When you master shutdown management with asyncio you are not a asyncio-padawan anymore ;)

You may think, Thats alot of code to shutdown clean, but to my understanding, this is the way to do it(and more or less the only way for bigger and more complex applications).

Best of luck.