I have an AMQP server (RabbitMQ) that I would like to both publish and read from in a Tornado web server. To do this, I figured I would use an asynchronous amqp python library; in particular Pika (a variation of it that supposedly supports Tornado).
I have written code that appears to successfully read from the queue, except that at the end of the request, I get an exception (the browser returns fine):
[E 101219 01:07:35 web:868] Uncaught exception GET / (127.0.0.1)
HTTPRequest(protocol='http', host='localhost:5000', method='GET', uri='/', version='HTTP/1.1', remote_ip='127.0.0.1', remote_ip='127.0.0.1', body='', headers={'Host': 'localhost:5000', 'Accept-Language': 'en-us,en;q=0.5', 'Accept-Encoding': 'gzip,deflate', 'Keep-Alive': '115', 'Accept': 'text/html,application/xhtml+xml,application/xml;q=0.9,*/*;q=0.8', 'User-Agent': 'Mozilla/5.0 (X11; U; Linux x86_64; en-US; rv:1.9.2.13) Gecko/20101206 Ubuntu/10.10 (maverick) Firefox/3.6.13', 'Accept-Charset': 'ISO-8859-1,utf-8;q=0.7,*;q=0.7', 'Connection': 'keep-alive', 'Cache-Control': 'max-age=0', 'If-None-Match': '"58f554b64ed24495235171596351069588d0260e"'})
Traceback (most recent call last):
File "/home/dave/devel/lib/python2.6/site-packages/tornado/web.py", line 810, in _stack_context
yield
File "/home/dave/devel/lib/python2.6/site-packages/tornado/stack_context.py", line 77, in StackContext
yield
File "/usr/lib/python2.6/contextlib.py", line 113, in nested
yield vars
File "/home/dave/lib/python2.6/site-packages/tornado/stack_context.py", line 126, in wrapped
callback(*args, **kwargs)
File "/home/dave/devel/src/pika/pika/tornado_adapter.py", line 42, in _handle_events
self._handle_read()
File "/home/dave/devel/src/pika/pika/tornado_adapter.py", line 66, in _handle_read
self.on_data_available(chunk)
File "/home/dave/devel/src/pika/pika/connection.py", line 521, in on_data_available
self.channels[frame.channel_number].frame_handler(frame)
KeyError: 1
I'm not entirely sure I am using this library correctly, so I might be doing something blatantly wrong. The basic flow of my code is:
- Request comes in
- Create connection to RabbitMQ using TornadoConnection; specify a callback
- In connection callback, create a channel, declare/bind my queue, and call basic_consume; specify a callback
- In consume callback, close the channel and call Tornado's finish function.
- See exception.
My questions are a few:
- Is this flow even correct? I'm not sure what the purpose of the connection callback is except that it doesn't work if I don't use it.
- Should I be creating one AMQP connection per web request? RabbitMQ's documentation suggests that no, I should not but rather I should stick to creating just channels. But where would I create the connection, and how do I attempt reconnects should it go down briefly?
- If I am creating one AMQP connection per Web request, where should I be closing it? Calling amqp.close() in my callback seems to screw things up even more.
I will try to have some sample code up a little later, but the steps I described above lay out the consuming side of things fairly completely. I am having issues with the publishing side as well, but the consuming of queues is more pressing.
Someone has reported success in merging Tornado and Pika here. From what I can tell, it isn't as simple as just calling Pika from Tornado, since both libraries want to have their own event loops in charge.
It would help to see some source code, but I use this same tornado-supporting pika module without issue in more than one production project.
You don't want to create a connection per request. Create a class that wraps all of your AMQP operations, and instantiate it as a singleton at the tornado Application level that can be used across requests (and across request handlers). I do this in a 'runapp()' function that does some stuff like this and then starts the main tornado ioloop.
Here's a class called 'Events'. It's a partial implementation (specifically, I don't define 'self.handle_event' here. That's up to you.
And then I put that in a file called 'events.py'. My RequestHandlers and any back end code all utilize a 'common.py' module that wraps code that's useful to both (my RequestHandlers don't call any amqp module methods directly -- same for db, cache, etc as well), so I define 'events=None' at the module level in common.py, and I instantiate the Event object kinda like this:
Happy new year :-D