Python 3.4 Comms Stream Delegate - Non-blocking re

2019-09-09 16:46发布

I'm putting together a client server app on RPi. It has a main thread which creates a comms thread to talk to an iOS device. The main thread creates an asyncio event loop and a sendQ and a recvQ and passes them as args to the commsDelegate main method in the comms thread.

The trouble I'm having is when iOS device connects, it needs to receive unsolicited data from this Python app as soon as the data becomes available and it needs to be able to send data up to the Python app. So send and receive need to be non-blocking.

There are great echo server tutorials out there. But little in terms of the server doing something useful with the data.

Can anyone assist me in getting asyncio to read my send queue and forward data as soon as the main thread has queued it? I have receive working great.

Main Thread creates a loop and starts the comms thread:

  commsLoop = asyncio.new_event_loop()
  commsMainThread = threading.Thread(target=CommsDelegate.commsDelegate, args=(commsInQ,commsOutQ,commsLoop,commsPort,), daemon=True)
  commsMainThread.start()

Then asyncio in module CommsDelegate should run the loop as loop.run_forever() server task reading and writing from a socket stream and sending receiving messages using queues back up to the main thread.

Here's my code so far. I found that if I create a factory for the protocol generator, I can pass it the queue names so the receipt of messages is all good now. When they arrive from the client they are queued _nowait and the main thread receives them just fine.

I just need asyncio to handle the queue of outbound messages from the Main thread as they arrive on sendQ, so it can send them on to the connected client.

#!/usr/bin/env python3.6
import asyncio

class ServerProtocol(asyncio.Protocol):

  def __init__(self, loop, recvQ, sendQ):
    self.loop = loop
    self.recvQ = recvQ
    self.sendQ = sendQ

  def connection_made(self, transport):
    peername = transport.get_extra_info('peername')
    print('Connection from {}'.format(peername))
    self.transport = transport

  def data_received(self, data):
    message = data.decode()
    print('Data received: {!r}'.format(message))
    self.recvQ.put_nowait(message.rstrip())

  # Needs work... I think the queue.get_nowait should be a co-ro maybe? 
  def unknownAtTheMo():
    dataToSend = sendQ.get_nowait()
    print('Send: {!r}'.format(message))
    self.transport.write(dataToSend)

  # Needs work to close on request from client or server or exc...
  def handleCloseSocket(self):
    print('Close the client socket')
    self.transport.close()

# async co-routine to consume the send message Q from Main Thread
async def consume(sendQ):
  print("In consume coro")
  while True:
    outboundData = await self.sendQ.get()
    print("Consumed", outboundData)
    self.transport.write(outboundData.encode('ascii'))

def commsDelegate(recvQ, sendQ, loop, port):
  asyncio.set_event_loop(loop)

  # Connection coroutine - Create a factory to assist the protocol in receipt of the queues as args
  factory = lambda: ProveItServerProtocol(loop, recvQ, sendQ)
  # Each client connection will create a new protocol instance
  connection = loop.run_until_complete(loop.create_server(factory, host='192.168.1.199', port=port))

  # Outgoing message queue handler
  consumer = asyncio.ensure_future(consume(sendQ))

  # Set up connection
  loop.run_until_complete(connection)

  # Wait until the connection is closed
  loop.run_forever()

  # Wait until the queue is empty
  loop.run_until_complete(queue.join())

  # Cancel the consumer
  consumer.cancel()

  # Let the consumer terminate
  loop.run_until_complete(consumer)

  # Close the connection
  connection.close()

  # Close the loop
  loop.close()

I send all data messages as json and CommsDelegate performs encode and decode then relays them asis.

Update: asyncio thread seems to be working well for incoming traffic. Server receives json and relays it via a queue - non-blocking.

Once the send is working, I'll have a reusable blackbox server on a thread.

1条回答
Bombasti
2楼-- · 2019-09-09 17:42

I can see two problems with your approach. First, all your clients are using the same recv and send queues, so there is no way the consume coroutine can know who to reply to.

The second issue has to do with your use of queues as a bridge between the synchronous and the asynchronous worlds. See this part of your code:

await self.sendQ.get()

If sendQ is a regular queue (from the queue module), this line will fail because sendQ is not a coroutine. On the other hand, if sendQ is an asyncio.Queue, the main thread won't be able to use sendQ.put because it is a coroutine. It would be possible to use put_nowait, but thread-safety is not guaranteed in asyncio. Instead, you'd have to use loop.call_soon_threadsafe:

loop.call_soon_threadsafe(sendQ.put_nowait, message)

In general, remember that asyncio is designed to run as the main application. It's supposed to run in the main thread, and communicate with synchronous code through a ThreadPoolExecutor (see loop.run_in_executor).

More information about multithreading in the asyncio documentation. You might also want to have a look at the asyncio stream API that provides a much nicer interface to work with TCP.

查看更多
登录 后发表回答