Deleting all pending tasks in celery / rabbitmq

2019-01-20 21:39发布

How can I delete all pending tasks without knowing the task_id for each task?

7条回答
你好瞎i
2楼-- · 2019-01-20 22:04

In Celery 3+

http://docs.celeryproject.org/en/3.1/faq.html#how-do-i-purge-all-waiting-tasks

CLI

Purge named queue:

 celery -A proj amqp queue.purge <queue name>

Purge configured queue

celery -A proj purge

I’ve purged messages, but there are still messages left in the queue? Answer: Tasks are acknowledged (removed from the queue) as soon as they are actually executed. After the worker has received a task, it will take some time until it is actually executed, especially if there are a lot of tasks already waiting for execution. Messages that are not acknowledged are held on to by the worker until it closes the connection to the broker (AMQP server). When that connection is closed (e.g. because the worker was stopped) the tasks will be re-sent by the broker to the next available worker (or the same worker when it has been restarted), so to properly purge the queue of waiting tasks you have to stop all the workers, and then purge the tasks using celery.control.purge().

So to purge the entire queue workers must be stopped.

查看更多
地球回转人心会变
3楼-- · 2019-01-20 22:05

In Celery 3+:

CLI:

$ celery -A proj purge

Programatically:

>>> from proj.celery import app
>>> app.control.purge()

http://docs.celeryproject.org/en/latest/faq.html#how-do-i-purge-all-waiting-tasks

查看更多
Emotional °昔
4楼-- · 2019-01-20 22:09

From the docs:

$ celery -A proj purge

or

from proj.celery import app
app.control.purge()

(EDIT: Updated with current method.)

查看更多
Fickle 薄情
5楼-- · 2019-01-20 22:10

I found that celery purge doesn't work for my more complex celery config. I use multiple named queues for different purposes:

$ sudo rabbitmqctl list_queues -p celery name messages consumers
Listing queues ...  # Output sorted, whitespaced for readability
celery                                          0   2
celery@web01.celery.pidbox                      0   1
celery@web02.celery.pidbox                      0   1
apns                                            0   1
apns@web01.celery.pidbox                        0   1
analytics                                       1   1
analytics@web01.celery.pidbox                   0   1
bcast.361093f1-de68-46c5-adff-d49ea8f164c0      0   1
bcast.a53632b0-c8b8-46d9-bd59-364afe9998c1      0   1
celeryev.c27b070d-b07e-4e37-9dca-dbb45d03fd54   0   1
celeryev.c66a9bed-84bd-40b0-8fe7-4e4d0c002866   0   1
celeryev.b490f71a-be1a-4cd8-ae17-06a713cc2a99   0   1
celeryev.9d023165-ab4a-42cb-86f8-90294b80bd1e   0   1

The first column is the queue name, the second is the number of messages waiting in the queue, and the third is the number of listeners for that queue. The queues are:

  • celery - Queue for standard, idempotent celery tasks
  • apns - Queue for Apple Push Notification Service tasks, not quite as idempotent
  • analytics - Queue for long running nightly analytics
  • *.pidbox - Queue for worker commands, such as shutdown and reset, one per worker (2 celery workers, one apns worker, one analytics worker)
  • bcast.* - Broadcast queues, for sending messages to all workers listening to a queue (rather than just the first to grab it)
  • celeryev.* - Celery event queues, for reporting task analytics

The analytics task is a brute force tasks that worked great on small data sets, but now takes more than 24 hours to process. Occasionally, something will go wrong and it will get stuck waiting on the database. It needs to be re-written, but until then, when it gets stuck I kill the task, empty the queue, and try again. I detect "stuckness" by looking at the message count for the analytics queue, which should be 0 (finished analytics) or 1 (waiting for last night's analytics to finish). 2 or higher is bad, and I get an email.

celery purge offers to erase tasks from one of the broadcast queues, and I don't see an option to pick a different named queue.

Here's my process:

$ sudo /etc/init.d/celeryd stop  # Wait for analytics task to be last one, Ctrl-C
$ ps -ef | grep analytics  # Get the PID of the worker, not the root PID reported by celery
$ sudo kill <PID>
$ sudo /etc/init.d/celeryd stop  # Confim dead
$ python manage.py celery amqp queue.purge analytics
$ sudo rabbitmqctl list_queues -p celery name messages consumers  # Confirm messages is 0
$ sudo /etc/init.d/celeryd start
查看更多
成全新的幸福
6楼-- · 2019-01-20 22:16

For celery 3.0+:

$ celery purge

To purge a specific queue:

$ celery -Q queue_name purge
查看更多
孤傲高冷的网名
7楼-- · 2019-01-20 22:18

1. To properly purge the queue of waiting tasks you have to stop all the workers (http://celery.readthedocs.io/en/latest/faq.html#i-ve-purged-messages-but-there-are-still-messages-left-in-the-queue):

$ sudo rabbitmqctl stop

or (in case RabbitMQ/message broker is managed by Supervisor):

$ sudo supervisorctl stop all

2. ...and then purge the tasks from a specific queue:

$ cd <source_dir>
$ celery amqp queue.purge <queue name>

3. Start RabbitMQ:

$ sudo rabbitmqctl start

or (in case RabbitMQ is managed by Supervisor):

$ sudo supervisorctl start all
查看更多
登录 后发表回答