Handling long running tasks in pika / RabbitMQ

2019-01-21 05:29发布

We're trying to set up a basic directed queue system where a producer will generate several tasks and one or more consumers will grab a task at a time, process it, and acknowledge the message.

The problem is, the processing can take 10-20 minutes, and we're not responding to messages at that time, causing the server to disconnect us.

Here's some pseudo code for our consumer:

#!/usr/bin/env python
import pika
import time

connection = pika.BlockingConnection(pika.ConnectionParameters(
        host='localhost'))
channel = connection.channel()

channel.queue_declare(queue='task_queue', durable=True)
print ' [*] Waiting for messages. To exit press CTRL+C'

def callback(ch, method, properties, body):
    long_running_task(connection)
    ch.basic_ack(delivery_tag = method.delivery_tag)

channel.basic_qos(prefetch_count=1)
channel.basic_consume(callback,
                      queue='task_queue')

channel.start_consuming()

After the first task completes, an exception is thrown somewhere deep inside of BlockingConnection, complaining that the socket was reset. In addition, the RabbitMQ logs show that the consumer was disconnected for not responding in time (why it resets the connection rather than sending a FIN is strange, but we won't worry about that).

We searched around a lot because we believed this was the normal use case for RabbitMQ (having a lot of long running tasks that should be split up among many consumers), but it seems like nobody else really had this issue. Finally we stumbled upon a thread where it was recommended to use heartbeats and to spawn the long_running_task() in a separate thread.

So the code has become:

#!/usr/bin/env python
import pika
import time
import threading

connection = pika.BlockingConnection(pika.ConnectionParameters(
        host='localhost',
        heartbeat_interval=20))
channel = connection.channel()

channel.queue_declare(queue='task_queue', durable=True)
print ' [*] Waiting for messages. To exit press CTRL+C'

def thread_func(ch, method, body):
    long_running_task(connection)
    ch.basic_ack(delivery_tag = method.delivery_tag)

def callback(ch, method, properties, body):
    threading.Thread(target=thread_func, args=(ch, method, body)).start()

channel.basic_qos(prefetch_count=1)
channel.basic_consume(callback,
                      queue='task_queue')

channel.start_consuming()

And this seems to work, but it's very messy. Are we sure that the ch object is thread safe? In addition, imagine that long_running_task() is using that connection parameter to add a task to a new queue (i.e. the first part of this long process is done, let's send the task on to the second part). So, the thread is using the connection object. Is that thread safe?

More to the point, what's the preferred way of doing this? I feel like this is very messy and possibly not thread safe, so maybe we're not doing it right. Thanks!

标签: rabbitmq pika
5条回答
姐就是有狂的资本
2楼-- · 2019-01-21 06:01
  1. You can periodic call connection.process_data_events() in your long_running_task(connection), this function will send heartbeat to server when it is been called, and keep the pika client away from close.
  2. Set the heartbeat value greater than call connection.process_data_events() period in your pika BlockingConnection.
查看更多
孤傲高冷的网名
3楼-- · 2019-01-21 06:11

I encounter the same problem you had.
My solution is:

  1. ture off the heartbeat on the server side
  2. evaluate the maximum time the task can possible take
  3. set the client heartbeat timeout to the time got from step2

Why this?

As i test with the following cases:

case one
  1. server heartbeat turn on, 1800s
  2. client unset

I still get error when task running for a very long time -- >1800

case two
  1. turn off server heartbeat
  2. turn off client heartbeat

There is no error on client side, except one problem--when the client crashes(my os restart on some faults), the tcp connection still can be seen at the Rabbitmq Management plugin. And it is confusing.

case three
  1. turn off server heartbeat
  2. turn on client heartbeat, set it to the foresee maximum run time

In this case, i can dynamic change every heatbeat on indivitual client. In fact, i set heartbeat on the machines crashed frequently.Moreover, i can see offline machine through the Rabbitmq Manangement plugin.

Environment

OS: centos x86_64
pika: 0.9.13
rabbitmq: 3.3.1

查看更多
干净又极端
4楼-- · 2019-01-21 06:20

Don't disable heartbeat.
The best solution is to run the task in a separate thread and , set the prefetch_count to 1 so that the consumer only gets 1 unacknowledged message using something like this channel.basic_qos(prefetch_count=1)

查看更多
Fickle 薄情
5楼-- · 2019-01-21 06:20

Please don't disable heartbeats!

As of Pika 0.12.0, please use the technique described in this example code to run your long-running task on a separate thread and then acknowledge the message from that thread.


NOTE: the RabbitMQ team monitors the rabbitmq-users mailing list and only sometimes answers questions on StackOverflow.

查看更多
够拽才男人
6楼-- · 2019-01-21 06:22

For now, your best bet is to turn off heartbeats, this will keep RabbitMQ from closing the connection if you're blocking for too long. I am experimenting with pika's core connection management and IO loop running in a background thread but it's not stable enough to release.

查看更多
登录 后发表回答