Celery: Callback after task hierarchy

2019-02-16 23:59发布

问题:

I'm using Celery from a webapp to start a task hierarchy.

Tasks

I'm using the following tasks:

  • task_a
  • task_b
  • task_c
  • notify_user

A Django view starts several task_a instances. Each of them does some processing and then starts several task_b instances. And each of those does some processing and then starts several task_c instances.

To visualize:

Goals

My goal is to execute all the tasks, and to run a callback function as soon as the entire hierarchy has finished. Additionally, I want to be able to pass data from the lowest tasks to the top level.

  1. The view should just "kick off" the tasks and then return.
  2. Each subtask depends on the parent task. The parent task does not depend directly on the child task. After a parent task has started all the child tasks, it can be stopped.
  3. Everything can be parallelized, as long as the parent task runs before the child task is started.
  4. After all the tasks have finished, the notify_user callback function should be called.
  5. The notify_user callback function needs access to data from the task_cs.

All the tasks should be non-blocking, so task_b should not wait for all the task_c subtasks to finish.

What would be the right way to achieve the goal stated above?

回答1:

The solution turned out to be the dynamic task feature provided in this pull request: https://github.com/celery/celery/pull/817. With this, each task can return a group of subtasks, which will then replace the original taks in the queue.



回答2:

Suppose you have these tasks:

celery = Celery(
    broker="amqp://test:test@localhost:5672/test"
)
celery.conf.update(
    CELERY_RESULT_BACKEND = "mongodb",
)


@celery.task
def task_a(result):
    print 'task_a:', result
    return result

@celery.task
def task_b(result):
    print 'task_b:', result
    return result

@celery.task
def task_c(result):
    print 'task_c:', result
    return result

@celery.task
def notify_user(result):
    print result
    return result

For a given input data (as you drawn it):

    tree = [
        [["C1", "C2", "C3"], ["C4", "C5"]], [["C6", "C7", "C8"], ["C9"]]
    ]

You can do:

    a_group = []
    for ia, a in enumerate(tree):
        print "A%s:" % ia
        b_group = []
        for ib, b in enumerate(a):
            print " - B%s:" % ib
            for c in b:
                print '   -', c

            c_group = group([task_c.s(c) for c in b])

            b_group.append(c_group | task_b.s())

        a_group.append(group(b_group) | task_a.s())

    final_task = group(a_group) | notify_user.s()

It's representation is (don't read it, it's ugly :)

[[[__main__.task_c('C1'), __main__.task_c('C2'), __main__.task_c('C3')] | __main__.task_b(), [__main__.task_c('C4'), __main__.task_c('C5')] | __main__.task_b()] | __main__.task_a(), [[__main__.task_c('C6'), __main__.task_c('C7'), __main__.task_c('C8')] | __main__.task_b(), [__main__.task_c('C9')] | __main__.task_b()] | __main__.task_a()] | __main__.notify_user()

And the data passed into notify_user would be:

[[['C1', 'C2', 'C3'], ['C4', 'C5']], [['C6', 'C7', 'C8'], ['C9']]]

Everything is run via callbacks (chords) so there are no tasks waiting for other tasks.