龙卷风的Redis:RPOP作品,但BRPOP不?(tornado-redis: RPOP work

2019-10-16 22:04发布

New to Tornado, and Redis, and implementing the beginnings of a listener / worker setup.

I want to be able to LPUSH tasks onto a queue and RPOP them off. BRPOP seems like the best way to pop them off, as it will wait for one to be added if none are currently there. The problem is, whenever I use it, it never returns... however when I use RPOP I get the next item in the queue as expected.

class ListenHandler(tornado.websocket.WebSocketHandler):
    uid = ''
    CHANNEL_TPL = "client_%s"
    RESPONSE_TPL = '{"command":"%s","rid":"%s","status":"%s","result":%s}'
    def open(self):
        # new websocket connection is established from a client
        print "open socket"
        self.uid = session = uuid4()        

    def on_message(self, message):
        print "on_message called [%s]" % message
        try:
            m = json.loads(message)
        except ValueError:
            self.write_message('BAD')
            return

        # check for RID (request id)
        if not 'rid' in m:
            self.write_message('error: unspecified rid')
            return

        # confirm receipt of data
        confirm_string = '%s OK' % (m['rid'])
        self.write_message(confirm_string)

        # check for command
        if not 'command' in m:
            response = '%s error: unspecified command' % (m['rid'])
            self.write_message(response)
            return

        # process commands
        if m['command'] == 'register':
            self._register(m['rid'])
        elif m['command'] == 'get_canvas':
            self._queue_command('read', m)
        elif m['command'] == 'save_canvas':
            if 'data' in m:
                self._queue_command('write', m)
            else:
                response = '%s unspecified data' % (m['rid'])
                self.write_message(response)
                return
        elif m['command'] == 'list_command_queue':
            self._list_command_queue(m['rid'])
        elif m['command'] == 'get_read_job':
            self._get_read_job(m['rid'])
        elif m['command'] == 'get_write_job':
            self._get_write_job(m['rid'])
        else:
            # no commands recognized
            response = '%s error: unknown command' % (m['rid'])
            self.write_message(response)
            return
        print "end of on_message()\n"

    def callback(self, data):
        print "- callback()"
        self.write_message(data)

    def on_close(self):
        # websocket connection is closed by client
        pass

    def _register(self, rid):
        data = '{"uid":"%s"}' % (self.uid)
        response = self.RESPONSE_TPL % ('register', rid, 'completed', data)
        self.callback(response)

    @tornado.web.asynchronous
    @tornado.gen.engine 
    def _queue_command(self, type, m):
        channel = self.CHANNEL_TPL % (type)
        print "pushing job to %s ... data[%s]" % (channel, m)
        yield tornado.gen.Task(self.application.rdb.lpush, channel, m)
        return

    def _list_command_queue(self, rid):
        channel_r = self.CHANNEL_TPL  % ('read')
        channel_w = self.CHANNEL_TPL  % ('write')
        data = '{"client_read":"%s","client_write":"%s"}' % (self.application.rdb.llen(channel_r), self.application.rdb.llen(channel_w))
        response = self.RESPONSE_TPL % ('list_command_queue', rid, 'completed', data)
        print "list_command_queue [%s]" % (response)
        self.callback(response)

    @tornado.web.asynchronous
    @tornado.gen.engine  
    def _get_read_job(self, rid):
        channel = self.CHANNEL_TPL % ('read')
        data = yield tornado.gen.Task(self.application.rdb.rpop, (channel))
        response = self.RESPONSE_TPL % ('get_read_job', rid, 'completed', data)
        self.callback(response)

    @tornado.web.asynchronous
    @tornado.gen.engine    
    def _get_write_job(self, rid):
        channel = self.CHANNEL_TPL  % ('write')
        data = yield tornado.gen.Task(self.application.rdb.rpop, (channel))
        response = self.RESPONSE_TPL % ('get_write_job', rid, 'completed', data)
        self.callback(response) 

The above class will take recognized commands and LPUSH them into one of two different queues (a 'write' queue for jobs which will write to an SQL DB, and a 'read' queue for jobs that are read-only). Theoretically there will be a number of "worker" threads on a different machine using BRPOP to get these commands and execute them. For now, though, I'm using the same listener to test whats in the queue.

The functions get_write_jobs and get_read_jobs will return the next entry in the queue, no problems. However if I ad a 'b' (brpop) to either method, the function never gets to call the callback. Seems to just lock, waiting for the next available entry, but there are entries in there.

Any idea what's going on here? Am I misunderstanding the purpose of BRPOP?

Thanks, Nick

Answer 1:

也许这是一个有点晚了,但我想我可以提供帮助。

今天我有这个同样的问题,它几乎要把我逼疯了。 在结束时,大量的调试后我已经通过使键或信道到BLPOP / brpop列表内解决了这个问题(例如,[频道])。

什么龙卷风Redis的brpop / BLPOP内部做的就是将键转换到一个列表,但在接收到只需一键,将字符串转换为字符(简直太神奇了......)的列表,这就是为什么调用块之后,它正在等待其名称对应于原始密钥的所有字符各种列表的新项目。



文章来源: tornado-redis: RPOP works but BRPOP doesn't?