I'm trying to write a Server Side Events server which I can connect to with telnet and have the telnet content be pushed to a browser. The idea behind using Python and asyncio is to use as little CPU as possible as this will be running on a Raspberry Pi.
So far I have the following which uses a library found here: https://pypi.python.org/pypi/asyncio-sse/0.1 which uses asyncio.
And I have also copied a telnet server which uses asyncio as well.
Both work separately, but I have no idea how to tie both together. As I understand it, I need to call send()
in the SSEHandler
class from inside Telnet.data_received
, but I don't know how to access it. Both of these 'servers' need to be running in a loop to accept new connections, or push data.
Can anyone help, or point me in another direction?
import asyncio
import sse
# Get an instance of the asyncio event loop
loop = asyncio.get_event_loop()
# Setup SSE address and port
sse_host, sse_port = '192.168.2.25', 8888
class Telnet(asyncio.Protocol):
def connection_made(self, transport):
print("Connection received!");
self.transport = transport
def data_received(self, data):
print(data)
self.transport.write(b'echo:')
self.transport.write(data)
# This is where I want to send data via SSE
# SSEHandler.send(data)
# Things I've tried :(
#loop.call_soon_threadsafe(SSEHandler.handle_request());
#loop.call_soon_threadsafe(sse_server.send("PAH!"));
def connection_lost(self, esc):
print("Connection lost!")
telnet_server.close()
class SSEHandler(sse.Handler):
@asyncio.coroutine
def handle_request(self):
self.send('Working')
# SSE server
sse_server = sse.serve(SSEHandler, sse_host, sse_port)
# Telnet server
telnet_server = loop.run_until_complete(loop.create_server(Telnet, '192.168.2.25', 7777))
#telnet_server.something = sse_server;
loop.run_until_complete(sse_server)
loop.run_until_complete(telnet_server.wait_closed())
Server side events are a sort of http protocol; and you may have any number of concurrent http requests in flight at any given moment, you may have zero if nobody is connected, or dozens. This nuance is all wrapped up in the two sse.serve
and sse.Handler
constructs; the former represents a single listening port, which dispatches each separate client request to the latter.
Additionally, sse.Handler.handle_request()
is called once for each client, and the client is disconnected once that co-routine terminates. In your code, that coroutine terminates immediately, and so the client sees a single "Working" event. So, we need to wait, more-or-less forever. We can do that by yield from
ing an asyncio.Future()
.
The second issue is that we'll somehow need to get a hold of all of the separate instances of a SSEHandler()
and use the send()
method on each of them, somehow. Well, we can have each one self-register in their handle_request()
methods; by adding each one to a dict which maps the individual handler instances to the future they are waiting on.
class SSEHandler(sse.Handler):
_instances = {}
@asyncio.coroutine
def handle_request(self):
self.send('Working')
my_future = asyncio.Future()
SSEHandler._instances[self] = my_future
yield from my_future
Now, to send an event to every listening we just visit all of the SSEHandler instances registered in the dict we created and using send()
on each one.
class SSEHandler(sse.Handler):
#...
@classmethod
def broadcast(cls, message):
for instance, future in cls._instances.items():
instance.send(message)
class Telnet(asyncio.Protocol):
#...
def data_received(self, data):
#...
SSEHandler.broadcast(data.decode('ascii'))
lastly, your code exits when the telnet connection closes. that's fine, but we should clean-up at that time, too. Fortunately, that's just a matter of setting a result on all of the futures for all of the handlers
class SSEHandler(sse.Handler):
#...
@classmethod
def abort(cls):
for instance, future in cls._instances.items():
future.set_result(None)
cls._instances = {}
class Telnet(asyncio.Protocol):
#...
def connection_lost(self, esc):
print("Connection lost!")
SSEHandler.abort()
telnet_server.close()
here's a full, working dump in case my illustration is not obvious.
import asyncio
import sse
loop = asyncio.get_event_loop()
sse_host, sse_port = '0.0.0.0', 8888
class Telnet(asyncio.Protocol):
def connection_made(self, transport):
print("Connection received!");
self.transport = transport
def data_received(self, data):
SSEHandler.broadcast(data.decode('ascii'))
def connection_lost(self, esc):
print("Connection lost!")
SSEHandler.abort()
telnet_server.close()
class SSEHandler(sse.Handler):
_instances = {}
@classmethod
def broadcast(cls, message):
for instance, future in cls._instances.items():
instance.send(message)
@classmethod
def abort(cls):
for instance, future in cls._instances.items():
future.set_result(None)
cls._instances = {}
@asyncio.coroutine
def handle_request(self):
self.send('Working')
my_future = asyncio.Future()
SSEHandler._instances[self] = my_future
yield from my_future
sse_server = sse.serve(SSEHandler, sse_host, sse_port)
telnet_server = loop.run_until_complete(loop.create_server(Telnet, '0.0.0.0', 7777))
loop.run_until_complete(sse_server)
loop.run_until_complete(telnet_server.wait_closed())