Django的&芹菜 - 路由问题(Django & Celery — Routing proble

2019-07-29 12:21发布

我使用Django和芹菜,我想设置路由到多个队列。 当我指定任务的routing_keyexchange (在任务装饰或使用apply_async()任务未添加到代理(这是海带连接到我的MySQL数据库)。

如果我指定的任务装饰队列名称(这将意味着路由按键被忽略),任务工作正常。 这似乎是与路由/交换设置有问题。

任何想法的问题可能是什么?

这里的设置:

settings.py

INSTALLED_APPS = (
    ...
    'kombu.transport.django',
    'djcelery',
)
BROKER_BACKEND = 'django'
CELERY_DEFAULT_QUEUE = 'default'
CELERY_DEFAULT_EXCHANGE = "tasks"
CELERY_DEFAULT_EXCHANGE_TYPE = "topic"
CELERY_DEFAULT_ROUTING_KEY = "task.default"
CELERY_QUEUES = {
    'default': {
        'binding_key':'task.#',
    },
    'i_tasks': {
        'binding_key':'important_task.#',
    },
}

tasks.py

from celery.task import task

@task(routing_key='important_task.update')
def my_important_task():
    try:
        ...
    except Exception as exc:
        my_important_task.retry(exc=exc)

启动任务:

from tasks import my_important_task
my_important_task.delay()

Answer 1:

您正在使用Django的ORM作为一个经纪人,这意味着声明只存储在内存中(见,不容争辩难找,交通比较表在http://readthedocs.org/docs/kombu/en/latest/introduction.html #传输比较 )

所以,当你申请与routing_key这个任务important_task.update它将无法航线它,因为它并没有宣布队列呢。

如果你这样做,将工作:

@task(queue="i_tasks", routing_key="important_tasks.update")
def important_task():
    print("IMPORTANT")

但它会更简单,为您使用自动路由功能,因为这里有什么,显示你需要使用一个“主题”交流,使用自动路由只需删除的设置

  • CELERY_DEFAULT_QUEUE
  • CELERY_DEFAULT_EXCHANGE
  • CELERY_DEFAULT_EXCHANGE_TYPE
  • CELERY_DEFAULT_ROUTING_KEY
  • CELERY_QUEUES

并宣布你的任务是这样的:

@task(queue="important")
def important_task():
    return "IMPORTANT"

然后启动工作从队列中消耗:

$ python manage.py celeryd -l info -Q important

或从两个缺省值(消耗celery )队列和important队列:

$ python manage.py celeryd -l info -Q celery,important

另一种很好的做法是将队列名称不是硬编码到任务和使用CELERY_ROUTES改为:

@task
def important_task():
    return "DEFAULT"

那么在您的设置:

CELERY_ROUTES = {"myapp.tasks.important_task": {"queue": "important"}}

如果你仍然坚持使用话题的交流,那么你可以添加这个路由器自动申报所有队列的第一次任务发送:

class PredeclareRouter(object):
    setup = False

    def route_for_task(self, *args, **kwargs):
        if self.setup:
            return
        self.setup = True
        from celery import current_app, VERSION as celery_version
        # will not connect anywhere when using the Django transport
        # because declarations happen in memory.
        with current_app.broker_connection() as conn:
            queues = current_app.amqp.queues
            channel = conn.default_channel
            if celery_version >= (2, 6):
                for queue in queues.itervalues():
                    queue(channel).declare()
            else:
                from kombu.common import entry_to_queue
                for name, opts in queues.iteritems():
                    entry_to_queue(name, **opts)(channel).declare()
CELERY_ROUTES = (PredeclareRouter(), )


文章来源: Django & Celery — Routing problems