New to Tornado, and Redis, and implementing the beginnings of a listener / worker setup.
I want to be able to LPUSH tasks onto a queue and RPOP them off. BRPOP seems like the best way to pop them off, as it will wait for one to be added if none are currently there. The problem is, whenever I use it, it never returns... however when I use RPOP I get the next item in the queue as expected.
class ListenHandler(tornado.websocket.WebSocketHandler):
uid = ''
CHANNEL_TPL = "client_%s"
RESPONSE_TPL = '{"command":"%s","rid":"%s","status":"%s","result":%s}'
def open(self):
# new websocket connection is established from a client
print "open socket"
self.uid = session = uuid4()
def on_message(self, message):
print "on_message called [%s]" % message
try:
m = json.loads(message)
except ValueError:
self.write_message('BAD')
return
# check for RID (request id)
if not 'rid' in m:
self.write_message('error: unspecified rid')
return
# confirm receipt of data
confirm_string = '%s OK' % (m['rid'])
self.write_message(confirm_string)
# check for command
if not 'command' in m:
response = '%s error: unspecified command' % (m['rid'])
self.write_message(response)
return
# process commands
if m['command'] == 'register':
self._register(m['rid'])
elif m['command'] == 'get_canvas':
self._queue_command('read', m)
elif m['command'] == 'save_canvas':
if 'data' in m:
self._queue_command('write', m)
else:
response = '%s unspecified data' % (m['rid'])
self.write_message(response)
return
elif m['command'] == 'list_command_queue':
self._list_command_queue(m['rid'])
elif m['command'] == 'get_read_job':
self._get_read_job(m['rid'])
elif m['command'] == 'get_write_job':
self._get_write_job(m['rid'])
else:
# no commands recognized
response = '%s error: unknown command' % (m['rid'])
self.write_message(response)
return
print "end of on_message()\n"
def callback(self, data):
print "- callback()"
self.write_message(data)
def on_close(self):
# websocket connection is closed by client
pass
def _register(self, rid):
data = '{"uid":"%s"}' % (self.uid)
response = self.RESPONSE_TPL % ('register', rid, 'completed', data)
self.callback(response)
@tornado.web.asynchronous
@tornado.gen.engine
def _queue_command(self, type, m):
channel = self.CHANNEL_TPL % (type)
print "pushing job to %s ... data[%s]" % (channel, m)
yield tornado.gen.Task(self.application.rdb.lpush, channel, m)
return
def _list_command_queue(self, rid):
channel_r = self.CHANNEL_TPL % ('read')
channel_w = self.CHANNEL_TPL % ('write')
data = '{"client_read":"%s","client_write":"%s"}' % (self.application.rdb.llen(channel_r), self.application.rdb.llen(channel_w))
response = self.RESPONSE_TPL % ('list_command_queue', rid, 'completed', data)
print "list_command_queue [%s]" % (response)
self.callback(response)
@tornado.web.asynchronous
@tornado.gen.engine
def _get_read_job(self, rid):
channel = self.CHANNEL_TPL % ('read')
data = yield tornado.gen.Task(self.application.rdb.rpop, (channel))
response = self.RESPONSE_TPL % ('get_read_job', rid, 'completed', data)
self.callback(response)
@tornado.web.asynchronous
@tornado.gen.engine
def _get_write_job(self, rid):
channel = self.CHANNEL_TPL % ('write')
data = yield tornado.gen.Task(self.application.rdb.rpop, (channel))
response = self.RESPONSE_TPL % ('get_write_job', rid, 'completed', data)
self.callback(response)
The above class will take recognized commands and LPUSH them into one of two different queues (a 'write' queue for jobs which will write to an SQL DB, and a 'read' queue for jobs that are read-only). Theoretically there will be a number of "worker" threads on a different machine using BRPOP to get these commands and execute them. For now, though, I'm using the same listener to test whats in the queue.
The functions get_write_jobs and get_read_jobs will return the next entry in the queue, no problems. However if I ad a 'b' (brpop) to either method, the function never gets to call the callback. Seems to just lock, waiting for the next available entry, but there are entries in there.
Any idea what's going on here? Am I misunderstanding the purpose of BRPOP?
Thanks, Nick