I'm trying to understand how does asyncio
work. In my scenario client makes a tcp
connection to the server, sends a login string, if authenticated - receives a stream of chars. Finally on KeyboardInterrupt
sends logoff
string to the server and hapilly disconnects.
Currently I'm stuck on the final part as my logoff method/task is destroyed before it has a chance to complete.
^CTask was destroyed but it is pending!
source_traceback: Object created at (most recent call last):
File "tst.py", line 101, in <module>
client.login()
File "tst.py", line 29, in login
logoff_tsk = self.loop.create_task(self.logoff())
task: <Task pending coro=<logoff() running at tst.py:49> cb=[myClass._shutdown()] created at tst.py:29>
Below is the code which produces this error:
from functools import partial
import asyncio
class myClass:
def __init__(self, *a, **kw):
self.transport = None
self.protocol = None
self.usr = str(kw.get("usr", ""))
self.pwd = str(kw.get("pwd", ""))
self.host = str(kw.get("host", "")) or "127.0.0.2"
self.port = int(kw.get("port", 0)) or 5038
self.loop = asyncio.get_event_loop()
self.loop.set_debug(enabled=True)
def reactor(self, recv):
print("## ~/~ From reactor: {!r}".format(recv.decode()))
def login(self):
connection_tsk = self.loop.create_task(self.loop.create_connection(
partial(
myProtocol,
reactor_func=self.reactor),
host=self.host,
port=self.port))
connection_tsk.add_done_callback(self.set_transport_protocol)
try:
self.loop.run_forever()
except KeyboardInterrupt:
logoff_tsk = self.loop.create_task(self.logoff())
logoff_tsk.add_done_callback(self._shutdown)
def set_transport_protocol(self, fut):
print("AmiCtl.set_transport_protocol")
transport, protocol = fut.result()
self.transport = transport
self.protocol = protocol
self.loop.call_soon(self._login)
def _login(self):
login_string = self.cmd("Login")
self.loop.create_task(self.transmit(login_string))
@asyncio.coroutine
def transmit(self, cmd):
if self.transport:
print("## ~/~ Sending data: {!r}".format(cmd))
self.transport.write(cmd)
@asyncio.coroutine
def logoff(self):
command = self.cmd("Logoff")
yield from asyncio.shield(self.transmit(command))
def _shutdown(self):
if self.transport:
self.transport.close()
self.loop.stop()
self.loop.close()
print("\n{!r}".format(self.loop))
def cmd(self, action):
"""
Produce login/logoff string.
"""
class myProtocol(asyncio.Protocol):
def __init__(self, reactor_func=None):
self.reactor_func = reactor_func
self.loop = asyncio.get_event_loop()
def connection_made(self, transport):
self.transport = transport
peername = transport.get_extra_info("peername")
print("## ~/~ Connection made: {peer}".format(peer=peername))
def data_received(self, data):
if callable(self.reactor_func):
self.reactor_func(data)
def connection_lost(self, exc):
print("## ~/~ Lost connection to the server!!")
self.loop.stop()
client = myClass(usr="my_usr", pwd="my_pwd")
client.login()
How can I improve/fix my code please?