龙卷风的WebSocket客户端失去响应消息?(Tornado websocket client l

2019-10-30 06:20发布

我需要处理来自网络摄像头帧,发送一些选定的帧发送到远程服务器的WebSocket。 服务器用确认消息(很像回声服务器)立即回答。 帧处理是缓慢的,CPU密集型,所以我想用一个单独的线程池(生产者)到使用所有可用内核来做到这一点。 因此,客户(消费者)就坐在空闲状态,直到池中有东西送。 我目前的执行情况,请参见下面, 只有当我加入制片测试循环中的小睡眠正常工作。 如果我删除这个延迟我停止从服务器(无论是回声服务器,并从我的真实服务器)得到任何回答。 即使是第一次的答案都将丢失,所以我不认为这是一个防洪保护机制。 我究竟做错了什么?

import tornado
from tornado.websocket import websocket_connect
from tornado import gen, queues

import time

class TornadoClient(object):

    url = None
    onMessageReceived = None
    onMessageSent = None

    ioloop = tornado.ioloop.IOLoop.current()
    q = queues.Queue()

    def __init__(self, url, onMessageReceived, onMessageSent):
        self.url = url
        self.onMessageReceived = onMessageReceived
        self.onMessageSent = onMessageSent

    def enqueueMessage(self, msgData, binary=False):
        print("TornadoClient.enqueueMessage")

        self.ioloop.add_callback(self.addToQueue, (msgData, binary))

        print("TornadoClient.enqueueMessage done")

    @gen.coroutine
    def addToQueue(self, msgTuple):
        yield self.q.put(msgTuple)

    @gen.coroutine
    def main_loop(self):

        connection = None
        try:
            while True:

              while connection is None:
                try:
                    print("Connecting...")
                    connection = yield websocket_connect(self.url)
                    print("Connected " + str(connection))
                except Exception, e:
                    print("Exception on connection " + str(e))
                    connection = None
                    print("Retry in a few seconds...")
                    yield gen.Task(self.ioloop.add_timeout, time.time() + 3)

              try:
                  print("Waiting for data to send...")
                  msgData, binaryVal  = yield self.q.get()
                  print("Writing...")
                  sendFuture = connection.write_message(msgData, binary=binaryVal)
                  print("Write scheduled...")
              finally:
                self.q.task_done()

              yield sendFuture
              self.onMessageSent("Sent ok")

              print("Write done. Reading...")
              msg = yield connection.read_message()
              print("Got msg.")
              self.onMessageReceived(msg)

              if msg is None:
                print("Connection lost")
                connection = None

            print("main loop completed")
        except Exception, e:
            print("ExceptionExceptionException")
            print(e)
            connection = None

        print("Exit main_loop function")

    def start(self):
        self.ioloop.run_sync(self.main_loop)
        print("Main loop completed")


######### TEST METHODS #########

def sendMessages(client):
    time.sleep(2)   #TEST only: wait for client startup
    while True:
        client.enqueueMessage("msgData", binary=False)
        time.sleep(1)  # <--- comment this line to break it

def testPrintMessage(msg):
    print("Received: " + str(msg))

def testPrintSentMessage(msg):
    print("Sent: " + msg)


if __name__=='__main__':

    from threading import Thread

    client = TornadoClient("ws://echo.websocket.org", testPrintMessage, testPrintSentMessage)

    thread = Thread(target = sendMessages, args = (client, ))
    thread.start()

    client.start()

我真正的问题

在我真正的程序中,我使用“窗口像”机制,以保护消费者(的autobahn.twisted.websocket服务器):制片人最多可以发送未确认消息(网络摄像头帧)的最大数量,然后停止等待该窗口的一半,腾出。 消费者发出一个“加工”的消息acknowleding一个或多个消息(只是一个计数器,而不是ID)。 我看到的对消费者日志是信息的处理和答案发回,但这些ACK在网络中的某个地方消失。

我有asynchio一点经验,所以我想确保我不会错过任何产量,注释或别的东西。

这是消费者端日志:

2017-05-13 18:59:54+0200 [-] TX Frame to tcp4:192.168.0.5:48964 : fin = True, rsv = 0, opcode = 1, mask = -, length = 21, repeat_length = None, chopsize = None, sync = False, payload = {"type": "PROCESSED"}
2017-05-13 18:59:54+0200 [-] TX Octets to tcp4:192.168.0.5:48964 : sync = False, octets = 81157b2274797065223a202250524f434553534544227d

Answer 1:

这是整洁的代码。 我相信你需要一个睡在你的理由sendMessages线程是因为,否则,它不断给你打电话enqueueMessage尽可能地快,每秒百万次。 由于enqueueMessage 等待排队的消息进行处理,它不断给你打电话IOLoop.add_callback尽可能快地就可以了,没有给予足够的循环机会来执行回调。

循环可能使主线程上运行的一些进展,因为你没有真正阻止它。 但sendMessages线增加了回调的速度远远超过了循环可以处理它们。 由当时的循环已经从队列弹出一个消息,并已开始对其进行处理,数以百万计的新的回调已经加入,其中环必须执行后,才能进入到信息处理的下一个阶段。

因此,对于你的测试代码,我认为这是正确的调用之间睡觉enqueueMessage的线程上。



文章来源: Tornado websocket client loosing response messages?