Celery dynamic queue creation and routing

2019-04-06 23:30发布

问题:

I'm trying to call a task and create a queue for that task if it doesn't exist then immediately insert to that queue the called task. I have the following code:

@task
def greet(name):
    return "Hello %s!" % name


def run():
    result = greet.delay(args=['marc'], queue='greet.1',
        routing_key='greet.1')
    print result.ready()

then I have a custom router:

class MyRouter(object):

    def route_for_task(self, task, args=None, kwargs=None):
        if task == 'tasks.greet':
            return {'queue': kwargs['queue'],
                    'exchange': 'greet',
                    'exchange_type': 'direct',
                    'routing_key': kwargs['routing_key']}
        return None

this creates an exchange called greet.1 and a queue called greet.1 but the queue is empty. The exchange should be just called greet which knows how to route a routing key like greet.1 to the queue called greet.1.

Any ideas?

回答1:

When you do the following:

task.apply_async(queue='foo', routing_key='foobar')

Then Celery will take default values from the 'foo' queue in CELERY_QUEUES, or if it does not exist then automatically create it using (queue=foo, exchange=foo, routing_key=foo)

So if 'foo' does not exist in CELERY_QUEUES you will end up with:

queues['foo'] = Queue('foo', exchange=Exchange('foo'), routing_key='foo')

The producer will then declare that queue, but since you override the routing_key, actually send the message using routing_key = 'foobar'

This may seem strange but the behavior is actually useful for topic exchanges, where you publish to different topics.

It's harder to do what you want though, you can create the queue yourself and declare it, but that won't work well with automatic message publish retries. It would be better if the queue argument to apply_async could support a custom kombu.Queue instead that will be both declared and used as the destination. Maybe you could open an issue for that at http://github.com/celery/celery/issues