我需要处理来自网络摄像头帧,发送一些选定的帧发送到远程服务器的WebSocket。 服务器用确认消息(很像回声服务器)立即回答。 帧处理是缓慢的,CPU密集型,所以我想用一个单独的线程池(生产者)到使用所有可用内核来做到这一点。 因此,客户(消费者)就坐在空闲状态,直到池中有东西送。 我目前的执行情况,请参见下面, 只有当我加入制片测试循环中的小睡眠正常工作。 如果我删除这个延迟我停止从服务器(无论是回声服务器,并从我的真实服务器)得到任何回答。 即使是第一次的答案都将丢失,所以我不认为这是一个防洪保护机制。 我究竟做错了什么?
import tornado
from tornado.websocket import websocket_connect
from tornado import gen, queues
import time
class TornadoClient(object):
url = None
onMessageReceived = None
onMessageSent = None
ioloop = tornado.ioloop.IOLoop.current()
q = queues.Queue()
def __init__(self, url, onMessageReceived, onMessageSent):
self.url = url
self.onMessageReceived = onMessageReceived
self.onMessageSent = onMessageSent
def enqueueMessage(self, msgData, binary=False):
print("TornadoClient.enqueueMessage")
self.ioloop.add_callback(self.addToQueue, (msgData, binary))
print("TornadoClient.enqueueMessage done")
@gen.coroutine
def addToQueue(self, msgTuple):
yield self.q.put(msgTuple)
@gen.coroutine
def main_loop(self):
connection = None
try:
while True:
while connection is None:
try:
print("Connecting...")
connection = yield websocket_connect(self.url)
print("Connected " + str(connection))
except Exception, e:
print("Exception on connection " + str(e))
connection = None
print("Retry in a few seconds...")
yield gen.Task(self.ioloop.add_timeout, time.time() + 3)
try:
print("Waiting for data to send...")
msgData, binaryVal = yield self.q.get()
print("Writing...")
sendFuture = connection.write_message(msgData, binary=binaryVal)
print("Write scheduled...")
finally:
self.q.task_done()
yield sendFuture
self.onMessageSent("Sent ok")
print("Write done. Reading...")
msg = yield connection.read_message()
print("Got msg.")
self.onMessageReceived(msg)
if msg is None:
print("Connection lost")
connection = None
print("main loop completed")
except Exception, e:
print("ExceptionExceptionException")
print(e)
connection = None
print("Exit main_loop function")
def start(self):
self.ioloop.run_sync(self.main_loop)
print("Main loop completed")
######### TEST METHODS #########
def sendMessages(client):
time.sleep(2) #TEST only: wait for client startup
while True:
client.enqueueMessage("msgData", binary=False)
time.sleep(1) # <--- comment this line to break it
def testPrintMessage(msg):
print("Received: " + str(msg))
def testPrintSentMessage(msg):
print("Sent: " + msg)
if __name__=='__main__':
from threading import Thread
client = TornadoClient("ws://echo.websocket.org", testPrintMessage, testPrintSentMessage)
thread = Thread(target = sendMessages, args = (client, ))
thread.start()
client.start()
我真正的问题
在我真正的程序中,我使用“窗口像”机制,以保护消费者(的autobahn.twisted.websocket服务器):制片人最多可以发送未确认消息(网络摄像头帧)的最大数量,然后停止等待该窗口的一半,腾出。 消费者发出一个“加工”的消息acknowleding一个或多个消息(只是一个计数器,而不是ID)。 我看到的对消费者日志是信息的处理和答案发回,但这些ACK在网络中的某个地方消失。
我有asynchio一点经验,所以我想确保我不会错过任何产量,注释或别的东西。
这是消费者端日志:
2017-05-13 18:59:54+0200 [-] TX Frame to tcp4:192.168.0.5:48964 : fin = True, rsv = 0, opcode = 1, mask = -, length = 21, repeat_length = None, chopsize = None, sync = False, payload = {"type": "PROCESSED"}
2017-05-13 18:59:54+0200 [-] TX Octets to tcp4:192.168.0.5:48964 : sync = False, octets = 81157b2274797065223a202250524f434553534544227d