I'm putting together a client server app on RPi. It has a main thread which creates a comms thread to talk to an iOS device. The main thread creates an asyncio event loop and a sendQ and a recvQ and passes them as args to the commsDelegate main method in the comms thread.
The trouble I'm having is when iOS device connects, it needs to receive unsolicited data from this Python app as soon as the data becomes available and it needs to be able to send data up to the Python app. So send and receive need to be non-blocking.
There are great echo server tutorials out there. But little in terms of the server doing something useful with the data.
Can anyone assist me in getting asyncio to read my send queue and forward data as soon as the main thread has queued it? I have receive working great.
Main Thread creates a loop and starts the comms thread:
commsLoop = asyncio.new_event_loop()
commsMainThread = threading.Thread(target=CommsDelegate.commsDelegate, args=(commsInQ,commsOutQ,commsLoop,commsPort,), daemon=True)
commsMainThread.start()
Then asyncio in module CommsDelegate should run the loop as loop.run_forever() server task reading and writing from a socket stream and sending receiving messages using queues back up to the main thread.
Here's my code so far. I found that if I create a factory for the protocol generator, I can pass it the queue names so the receipt of messages is all good now. When they arrive from the client they are queued _nowait and the main thread receives them just fine.
I just need asyncio to handle the queue of outbound messages from the Main thread as they arrive on sendQ, so it can send them on to the connected client.
#!/usr/bin/env python3.6
import asyncio
class ServerProtocol(asyncio.Protocol):
def __init__(self, loop, recvQ, sendQ):
self.loop = loop
self.recvQ = recvQ
self.sendQ = sendQ
def connection_made(self, transport):
peername = transport.get_extra_info('peername')
print('Connection from {}'.format(peername))
self.transport = transport
def data_received(self, data):
message = data.decode()
print('Data received: {!r}'.format(message))
self.recvQ.put_nowait(message.rstrip())
# Needs work... I think the queue.get_nowait should be a co-ro maybe?
def unknownAtTheMo():
dataToSend = sendQ.get_nowait()
print('Send: {!r}'.format(message))
self.transport.write(dataToSend)
# Needs work to close on request from client or server or exc...
def handleCloseSocket(self):
print('Close the client socket')
self.transport.close()
# async co-routine to consume the send message Q from Main Thread
async def consume(sendQ):
print("In consume coro")
while True:
outboundData = await self.sendQ.get()
print("Consumed", outboundData)
self.transport.write(outboundData.encode('ascii'))
def commsDelegate(recvQ, sendQ, loop, port):
asyncio.set_event_loop(loop)
# Connection coroutine - Create a factory to assist the protocol in receipt of the queues as args
factory = lambda: ProveItServerProtocol(loop, recvQ, sendQ)
# Each client connection will create a new protocol instance
connection = loop.run_until_complete(loop.create_server(factory, host='192.168.1.199', port=port))
# Outgoing message queue handler
consumer = asyncio.ensure_future(consume(sendQ))
# Set up connection
loop.run_until_complete(connection)
# Wait until the connection is closed
loop.run_forever()
# Wait until the queue is empty
loop.run_until_complete(queue.join())
# Cancel the consumer
consumer.cancel()
# Let the consumer terminate
loop.run_until_complete(consumer)
# Close the connection
connection.close()
# Close the loop
loop.close()
I send all data messages as json and CommsDelegate performs encode and decode then relays them asis.
Update: asyncio thread seems to be working well for incoming traffic. Server receives json and relays it via a queue - non-blocking.
Once the send is working, I'll have a reusable blackbox server on a thread.