I'm using Django and Celery and I'm trying to setup routing to multiple queues. When I specify a task's routing_key
and exchange
(either in the task decorator or using apply_async()
), the task isn't added to the broker (which is Kombu connecting to my MySQL database).
If I specify the queue name in the task decorator (which will mean the routing key is ignored), the task works fine. It appears to be a problem with the routing/exchange setup.
Any idea what the problem could be?
Here's the setup:
settings.py
INSTALLED_APPS = (
...
'kombu.transport.django',
'djcelery',
)
BROKER_BACKEND = 'django'
CELERY_DEFAULT_QUEUE = 'default'
CELERY_DEFAULT_EXCHANGE = "tasks"
CELERY_DEFAULT_EXCHANGE_TYPE = "topic"
CELERY_DEFAULT_ROUTING_KEY = "task.default"
CELERY_QUEUES = {
'default': {
'binding_key':'task.#',
},
'i_tasks': {
'binding_key':'important_task.#',
},
}
tasks.py
from celery.task import task
@task(routing_key='important_task.update')
def my_important_task():
try:
...
except Exception as exc:
my_important_task.retry(exc=exc)
Initiate task:
from tasks import my_important_task
my_important_task.delay()
You are using the Django ORM as a broker, which means declarations are only stored in memory (see the, inarguably hard to find, transport comparison table at http://readthedocs.org/docs/kombu/en/latest/introduction.html#transport-comparison)
So when you apply this task with routing_key
important_task.update
it will not be able to route it, because it hasn't declared the queue yet.It will work if you do this:
But it would be much simpler for you to use the automatic routing feature, since there's nothing here that shows you need to use a 'topic' exchange, to use automatic routing simply remove the settings:
CELERY_DEFAULT_QUEUE
,CELERY_DEFAULT_EXCHANGE
,CELERY_DEFAULT_EXCHANGE_TYPE
CELERY_DEFAULT_ROUTING_KEY
CELERY_QUEUES
And declare your task like this:
and then to start a worker consuming from that queue:
or to consume from both the default (
celery
) queue and theimportant
queue:Another good practice is to not hardcode the queue names into the task and use
CELERY_ROUTES
instead:then in your settings:
If you still insist on using topic exchanges then you could add this router to automatically declare all queues the first time a task is sent: