I'm trying to find how can I use Redis and Tornado asynchronously. I found the tornado-redis but I need more than just add a yield
in the code.
I have the following code:
import redis
import tornado.web
class WaiterHandler(tornado.web.RequestHandler):
@tornado.web.asynchronous
def get(self):
client = redis.StrictRedis(port=6279)
pubsub = client.pubsub()
pubsub.subscribe('test_channel')
for item in pubsub.listen():
if item['type'] == 'message':
print item['channel']
print item['data']
self.write(item['data'])
self.finish()
class GetHandler(tornado.web.RequestHandler):
def get(self):
self.write("Hello world")
application = tornado.web.Application([
(r"/", GetHandler),
(r"/wait", WaiterHandler),
])
if __name__ == '__main__':
application.listen(8888)
print 'running'
tornado.ioloop.IOLoop.instance().start()
I need getting access the /
url and get the "Hello World" while there's a request pending in the /wait
.
How can I do it?
You should not use Redis pub/sub in the main Tornado thread, as it will block the IO loop. You can handle the long polling from web clients in the main thread, but you should create a separate thread for listening to Redis. You can then use ioloop.add_callback()
and/or a threading.Queue
to communicate with the main thread when you receive messages.
You need to use Tornado IOLoop compatible redis client.
There are few of them available, toredis, brukva, etc.
Here's pubsub example in toredis: https://github.com/mrjoes/toredis/blob/master/tests/test_handler.py
For Python >= 3.3, I would advise you to use aioredis.
I did not test the code below but it should be something like that:
import redis
import tornado.web
from tornado.web import RequestHandler
import aioredis
import asyncio
from aioredis.pubsub import Receiver
class WaiterHandler(tornado.web.RequestHandler):
@tornado.web.asynchronous
def get(self):
client = await aioredis.create_redis((host, 6279), encoding="utf-8", loop=IOLoop.instance().asyncio_loop)
ch = redis.channels['test_channel']
result = None
while await ch.wait_message():
item = await ch.get()
if item['type'] == 'message':
print item['channel']
print item['data']
result = item['data']
self.write(result)
self.finish()
class GetHandler(tornado.web.RequestHandler):
def get(self):
self.write("Hello world")
application = tornado.web.Application([
(r"/", GetHandler),
(r"/wait", WaiterHandler),
])
if __name__ == '__main__':
print 'running'
tornado.ioloop.IOLoop.configure('tornado.platform.asyncio.AsyncIOLoop')
server = tornado.httpserver.HTTPServer(application)
server.bind(8888)
# zero means creating as many processes as there are cores.
server.start(0)
tornado.ioloop.IOLoop.instance().start()
Okay, so here's my example of how I would do it with get requests.
I added two main components:
The first is a simple threaded pubsub listener which appends new messages into a local list object.
I also added list accessors to the class, so you can read from the listener thread as if you were reading from a regular list. As far as your WebRequest
is concerned, you're just reading data from a local list object. This returns immediately and doesn't block current request from completing or future requests from being accepted and processed.
class OpenChannel(threading.Thread):
def __init__(self, channel, host = None, port = None):
threading.Thread.__init__(self)
self.lock = threading.Lock()
self.redis = redis.StrictRedis(host = host or 'localhost', port = port or 6379)
self.pubsub = self.redis.pubsub()
self.pubsub.subscribe(channel)
self.output = []
# lets implement basic getter methods on self.output, so you can access it like a regular list
def __getitem__(self, item):
with self.lock:
return self.output[item]
def __getslice__(self, start, stop = None, step = None):
with self.lock:
return self.output[start:stop:step]
def __str__(self):
with self.lock:
return self.output.__str__()
# thread loop
def run(self):
for message in self.pubsub.listen():
with self.lock:
self.output.append(message['data'])
def stop(self):
self._Thread__stop()
The second is the ApplicationMixin class. This a secondary object you have your web request class inherit in order to add functionality and attributes. In this case it checks whether a channel listener already exists for the requested channel, creates one if none was found, and returns the listener handle to the WebRequest.
# add a method to the application that will return existing channels
# or create non-existing ones and then return them
class ApplicationMixin(object):
def GetChannel(self, channel, host = None, port = None):
if channel not in self.application.channels:
self.application.channels[channel] = OpenChannel(channel, host, port)
self.application.channels[channel].start()
return self.application.channels[channel]
The WebRequest class now treats the listener as if it were a static list (bearing in mind that you need to give self.write
a string)
class ReadChannel(tornado.web.RequestHandler, ApplicationMixin):
@tornado.web.asynchronous
def get(self, channel):
# get the channel
channel = self.GetChannel(channel)
# write out its entire contents as a list
self.write('{}'.format(channel[:]))
self.finish() # not necessary?
Finally, after application is created, I added an empty dictionary as an attribute
# add a dictionary containing channels to your application
application.channels = {}
As well as some cleanup of the running threads, once you exit the application
# clean up the subscribed channels
for channel in application.channels:
application.channels[channel].stop()
application.channels[channel].join()
The complete code:
import threading
import redis
import tornado.web
class OpenChannel(threading.Thread):
def __init__(self, channel, host = None, port = None):
threading.Thread.__init__(self)
self.lock = threading.Lock()
self.redis = redis.StrictRedis(host = host or 'localhost', port = port or 6379)
self.pubsub = self.redis.pubsub()
self.pubsub.subscribe(channel)
self.output = []
# lets implement basic getter methods on self.output, so you can access it like a regular list
def __getitem__(self, item):
with self.lock:
return self.output[item]
def __getslice__(self, start, stop = None, step = None):
with self.lock:
return self.output[start:stop:step]
def __str__(self):
with self.lock:
return self.output.__str__()
# thread loop
def run(self):
for message in self.pubsub.listen():
with self.lock:
self.output.append(message['data'])
def stop(self):
self._Thread__stop()
# add a method to the application that will return existing channels
# or create non-existing ones and then return them
class ApplicationMixin(object):
def GetChannel(self, channel, host = None, port = None):
if channel not in self.application.channels:
self.application.channels[channel] = OpenChannel(channel, host, port)
self.application.channels[channel].start()
return self.application.channels[channel]
class ReadChannel(tornado.web.RequestHandler, ApplicationMixin):
@tornado.web.asynchronous
def get(self, channel):
# get the channel
channel = self.GetChannel(channel)
# write out its entire contents as a list
self.write('{}'.format(channel[:]))
self.finish() # not necessary?
class GetHandler(tornado.web.RequestHandler):
def get(self):
self.write("Hello world")
application = tornado.web.Application([
(r"/", GetHandler),
(r"/channel/(?P<channel>\S+)", ReadChannel),
])
# add a dictionary containing channels to your application
application.channels = {}
if __name__ == '__main__':
application.listen(8888)
print 'running'
try:
tornado.ioloop.IOLoop.instance().start()
except KeyboardInterrupt:
pass
# clean up the subscribed channels
for channel in application.channels:
application.channels[channel].stop()
application.channels[channel].join()