-->

Tornado websocket client loosing response messages

2019-09-13 03:02发布

问题:

I need to process frames from a webcam and send a few selected frames to a remote websocket server. The server answers immediately with a confirmation message (much like an echo server). Frame processing is slow and cpu intensive so I want to do it using a separate thread pool (producer) to use all the available cores. So the client (consumer) just sits idle until the pool has something to send. My current implementation, see below, works fine only if I add a small sleep inside the producer test loop. If I remove this delay I stop receiving any answer from the server (both the echo server and from my real server). Even the first answer is lost, so I do not think this is a flood protection mechanism. What am I doing wrong?

import tornado
from tornado.websocket import websocket_connect
from tornado import gen, queues

import time

class TornadoClient(object):

    url = None
    onMessageReceived = None
    onMessageSent = None

    ioloop = tornado.ioloop.IOLoop.current()
    q = queues.Queue()

    def __init__(self, url, onMessageReceived, onMessageSent):
        self.url = url
        self.onMessageReceived = onMessageReceived
        self.onMessageSent = onMessageSent

    def enqueueMessage(self, msgData, binary=False):
        print("TornadoClient.enqueueMessage")

        self.ioloop.add_callback(self.addToQueue, (msgData, binary))

        print("TornadoClient.enqueueMessage done")

    @gen.coroutine
    def addToQueue(self, msgTuple):
        yield self.q.put(msgTuple)

    @gen.coroutine
    def main_loop(self):

        connection = None
        try:
            while True:

              while connection is None:
                try:
                    print("Connecting...")
                    connection = yield websocket_connect(self.url)
                    print("Connected " + str(connection))
                except Exception, e:
                    print("Exception on connection " + str(e))
                    connection = None
                    print("Retry in a few seconds...")
                    yield gen.Task(self.ioloop.add_timeout, time.time() + 3)

              try:
                  print("Waiting for data to send...")
                  msgData, binaryVal  = yield self.q.get()
                  print("Writing...")
                  sendFuture = connection.write_message(msgData, binary=binaryVal)
                  print("Write scheduled...")
              finally:
                self.q.task_done()

              yield sendFuture
              self.onMessageSent("Sent ok")

              print("Write done. Reading...")
              msg = yield connection.read_message()
              print("Got msg.")
              self.onMessageReceived(msg)

              if msg is None:
                print("Connection lost")
                connection = None

            print("main loop completed")
        except Exception, e:
            print("ExceptionExceptionException")
            print(e)
            connection = None

        print("Exit main_loop function")

    def start(self):
        self.ioloop.run_sync(self.main_loop)
        print("Main loop completed")


######### TEST METHODS #########

def sendMessages(client):
    time.sleep(2)   #TEST only: wait for client startup
    while True:
        client.enqueueMessage("msgData", binary=False)
        time.sleep(1)  # <--- comment this line to break it

def testPrintMessage(msg):
    print("Received: " + str(msg))

def testPrintSentMessage(msg):
    print("Sent: " + msg)


if __name__=='__main__':

    from threading import Thread

    client = TornadoClient("ws://echo.websocket.org", testPrintMessage, testPrintSentMessage)

    thread = Thread(target = sendMessages, args = (client, ))
    thread.start()

    client.start()

My real problem

In my real program I use a "window like" mechanism to protect the consumer (an autobahn.twisted.websocket server): the producer can send up to a maximum number of un-acknowledge messages (the webcam frames), then stops waiting for half of the window to free up. The consumer sends a "PROCESSED" message back acknowleding one or more messages (just a counter, not by id). What I see on the consumer log is that the messages are processed and the answer is sent back but these acks vanish somewhere in the network.

I have little experience with asynchio so I wanted to be sure that I'm not missing any yield, annotation or something else.

This is the consumer side log:

2017-05-13 18:59:54+0200 [-] TX Frame to tcp4:192.168.0.5:48964 : fin = True, rsv = 0, opcode = 1, mask = -, length = 21, repeat_length = None, chopsize = None, sync = False, payload = {"type": "PROCESSED"}
2017-05-13 18:59:54+0200 [-] TX Octets to tcp4:192.168.0.5:48964 : sync = False, octets = 81157b2274797065223a202250524f434553534544227d

回答1:

This is neat code. I believe the reason you need a sleep in your sendMessages thread is because, otherwise, it keeps calling enqueueMessage as fast as possible, millions of times per second. Since enqueueMessage does not wait for the enqueued message to be processed, it keeps calling IOLoop.add_callback as fast as it can, without giving the loop enough opportunity to execute the callbacks.

The loop might make some progress running on the main thread, since you're not actually blocking it. But the sendMessages thread adds callbacks much faster than the loop can handle them. By the time the loop has popped one message from the queue and has begun to process it, millions of new callbacks are added already, which the loop must execute before it can advance to the next stage of message-processing.

Therefore, for your test code, I think it's correct to sleep between calls to enqueueMessage on the thread.