Why does celery add thousands of queues to rabbitm

2019-02-04 09:16发布

问题:

I am using celery with a rabbitmq backend. It is producing thousands of queues with 0 or 1 items in them in rabbitmq like this:

$ sudo rabbitmqctl list_queues
Listing queues ...
c2e9b4beefc7468ea7c9005009a57e1d        1
1162a89dd72840b19fbe9151c63a4eaa        0
07638a97896744a190f8131c3ba063de        0
b34f8d6d7402408c92c77ff93cdd7cf8        1
f388839917ff4afa9338ef81c28aad75        0
8b898d0c7c7e4be4aa8007b38ccc00ea        1
3fb4be51aaaa4ac097af535301084b01        1

This seems to be inefficient, but further I have observed that these queues persist long after processing is finished.

I have found the task that appears to be doing this:

@celery.task(ignore_result=True)
def write_pages(page_generator):  
    g = group(render_page.s(page) for page in page_generator)
    res = g.apply_async()

    for rendered_page in res:
        print rendered_page # TODO: print to file

It seems that because these tasks are being called in a group, they are being thrown into the queue but never being released. However, I am clearly consuming the results (as I can view them being printed when I iterate through res. So, I do not understand why those tasks are persisting in the queue.

Additionally, I am wondering if the large number queues that are being created is some indication that I am doing something wrong.

Thanks for any help with this!

回答1:

Celery with the AMQP backend will store task tombstones (results) in an AMQP queue named with the task ID that produced the result. These queues will persist even after the results are drained.

A couple recommendations:

  • Apply ignore_result=True to every task you can. Don't depend on results from other tasks.
  • Switch to a different backend (perhaps Redis -- it's more efficient anyway): http://docs.celeryproject.org/en/latest/userguide/tasks.html


回答2:

Use CELERY_TASK_RESULT_EXPIRES (or on 4.1 CELERY_RESULT_EXPIRES) to have a periodic cleanup task remove old data from rabbitmq.

http://docs.celeryproject.org/en/master/userguide/configuration.html#std:setting-result_expires