I have two, separate RabbitMQ instances. I'm trying to find the best way to listen to events from both.
For example, I can consume events on one with the following:
credentials = pika.PlainCredentials(user, pass)
connection = pika.BlockingConnection(pika.ConnectionParameters(host="host1", credentials=credentials))
channel = connection.channel()
result = channel.queue_declare(Exclusive=True)
self.channel.queue_bind(exchange="my-exchange", result.method.queue, routing_key='*.*.*.*.*')
channel.basic_consume(callback_func, result.method.queue, no_ack=True)
self.channel.start_consuming()
I have a second host, "host2", that I'd like to listen to as well. I thought about creating two separate threads to do this, but from what I've read, pika isn't thread safe. Is there a better way? Or would creating two separate threads, each listening to a different Rabbit instance (host1, and host2) be sufficient?
The answer to "what is the best way" depends heavily on your usage pattern of queues and what you mean by "best". Since I can't comment on questions yet, I'll just try to suggest some possible solutions.
In each example I'm going to assume exchange is already declared.
Threads
You can consume messages from two queues on separate hosts in single process using pika
.
You are right - as its own FAQ states, pika
is not thread safe, but it can be used in multi-threaded manner by creating connections to RabbitMQ hosts per thread. Making this example run in threads using threading
module looks as follows:
import pika
import threading
class ConsumerThread(threading.Thread):
def __init__(self, host, *args, **kwargs):
super(ConsumerThread, self).__init__(*args, **kwargs)
self._host = host
# Not necessarily a method.
def callback_func(self, channel, method, properties, body):
print("{} received '{}'".format(self.name, body))
def run(self):
credentials = pika.PlainCredentials("guest", "guest")
connection = pika.BlockingConnection(
pika.ConnectionParameters(host=self._host,
credentials=credentials))
channel = connection.channel()
result = channel.queue_declare(exclusive=True)
channel.queue_bind(result.method.queue,
exchange="my-exchange",
routing_key="*.*.*.*.*")
channel.basic_consume(self.callback_func,
result.method.queue,
no_ack=True)
channel.start_consuming()
if __name__ == "__main__":
threads = [ConsumerThread("host1"), ConsumerThread("host2")]
for thread in threads:
thread.start()
I've declared callback_func
as a method purely to use ConsumerThread.name
while printing message body. It might as well be a function outside the ConsumerThread
class.
Processes
Alternatively, you can always just run one process with consumer code per queue you want to consume events.
import pika
import sys
def callback_func(channel, method, properties, body):
print(body)
if __name__ == "__main__":
credentials = pika.PlainCredentials("guest", "guest")
connection = pika.BlockingConnection(
pika.ConnectionParameters(host=sys.argv[1],
credentials=credentials))
channel = connection.channel()
result = channel.queue_declare(exclusive=True)
channel.queue_bind(result.method.queue,
exchange="my-exchange",
routing_key="*.*.*.*.*")
channel.basic_consume(callback_func, result.method.queue, no_ack=True)
channel.start_consuming()
and then run by:
$ python single_consume.py host1
$ python single_consume.py host2 # e.g. on another console
If the work you're doing on messages from queues is CPU-heavy and as long as number of cores in your CPU >= number of consumers, it is generally better to use this approach - unless your queues are empty most of the time and consumers won't utilize this CPU time*.
Async
Another alternative is to involve some asynchronous framework (for example Twisted
) and running whole thing in single thread.
You can no longer use BlockingConnection
in asynchronous code; fortunately, pika
has adapter for Twisted
:
from pika.adapters.twisted_connection import TwistedProtocolConnection
from pika.connection import ConnectionParameters
from twisted.internet import protocol, reactor, task
from twisted.python import log
class Consumer(object):
def on_connected(self, connection):
d = connection.channel()
d.addCallback(self.got_channel)
d.addCallback(self.queue_declared)
d.addCallback(self.queue_bound)
d.addCallback(self.handle_deliveries)
d.addErrback(log.err)
def got_channel(self, channel):
self.channel = channel
return self.channel.queue_declare(exclusive=True)
def queue_declared(self, queue):
self._queue_name = queue.method.queue
self.channel.queue_bind(queue=self._queue_name,
exchange="my-exchange",
routing_key="*.*.*.*.*")
def queue_bound(self, ignored):
return self.channel.basic_consume(queue=self._queue_name)
def handle_deliveries(self, queue_and_consumer_tag):
queue, consumer_tag = queue_and_consumer_tag
self.looping_call = task.LoopingCall(self.consume_from_queue, queue)
return self.looping_call.start(0)
def consume_from_queue(self, queue):
d = queue.get()
return d.addCallback(lambda result: self.handle_payload(*result))
def handle_payload(self, channel, method, properties, body):
print(body)
if __name__ == "__main__":
consumer1 = Consumer()
consumer2 = Consumer()
parameters = ConnectionParameters()
cc = protocol.ClientCreator(reactor,
TwistedProtocolConnection,
parameters)
d1 = cc.connectTCP("host1", 5672)
d1.addCallback(lambda protocol: protocol.ready)
d1.addCallback(consumer1.on_connected)
d1.addErrback(log.err)
d2 = cc.connectTCP("host2", 5672)
d2.addCallback(lambda protocol: protocol.ready)
d2.addCallback(consumer2.on_connected)
d2.addErrback(log.err)
reactor.run()
This approach would be even better, the more queues you would consume from and the less CPU-bound the work performing by consumers is*.
Python 3
Since you've mentioned pika
, I've restricted myself to Python 2.x-based solutions, because pika
is not yet ported.
But in case you would want to move to >=3.3, one possible option is to use asyncio
with one of AMQP protocol (the protocol you speak in with RabbitMQ) , e.g. asynqp
or aioamqp
.
* - please note that these are very shallow tips - in most cases choice is not that obvious; what will be the best for you depends on queues "saturation" (messages/time), what work do you do upon receiving these messages, what environment you run your consumers in etc.; there's no way to be sure other than to benchmark all implementations
Below is an example of how I use one rabbitmq instance to listen to 2 queues at the same time:
import pika
import threading
threads=[]
def client_info(channel):
channel.queue_declare(queue='proxy-python')
print (' [*] Waiting for client messages. To exit press CTRL+C')
def callback(ch, method, properties, body):
print (" Received %s" % (body))
channel.basic_consume(callback, queue='proxy-python', no_ack=True)
channel.start_consuming()
def scenario_info(channel):
channel.queue_declare(queue='savi-virnet-python')
print (' [*] Waiting for scenrio messages. To exit press CTRL+C')
def callback(ch, method, properties, body):
print (" Received %s" % (body))
channel.basic_consume(callback, queue='savi-virnet-python', no_ack=True)
channel.start_consuming()
def manager():
connection1= pika.BlockingConnection(pika.ConnectionParameters
(host='localhost'))
channel1 = connection1.channel()
connection2= pika.BlockingConnection(pika.ConnectionParameters
(host='localhost'))
channel2 = connection2.channel()
t1 = threading.Thread(target=client_info, args=(channel1,))
t1.daemon = True
threads.append(t1)
t1.start()
t2 = threading.Thread(target=scenario_info, args=(channel2,))
t2.daemon = True
threads.append(t2)
t2.start()
for t in threads:
t.join()
manager()