Retrying celery failed tasks that are part of a ch

2019-01-25 06:18发布

I have a celery chain that runs some tasks. Each of the tasks can fail and be retried. Please see below for a quick example:

from celery import task

@task(ignore_result=True)
def add(x, y, fail=True):
    try:
        if fail:
            raise Exception('Ugly exception.')
        print '%d + %d = %d' % (x, y, x+y)
    except Exception as e:
        raise add.retry(args=(x, y, False), exc=e, countdown=10)

@task(ignore_result=True)
def mul(x, y):
    print '%d * %d = %d' % (x, y, x*y)

and the chain:

from celery.canvas import chain
chain(add.si(1, 2), mul.si(3, 4)).apply_async()

Running the two tasks (and assuming that nothing fails), your would get/see printed:

1 + 2 = 3
3 * 4 = 12

However, when the add task fails the first time and succeeds in subsequent retry calls, the rest of the tasks in the chain do not run, i.e. the add task fails, all other tasks in the chain are not run and after a few seconds, the add task runs again and succeeds and the rest of the tasks in the chain (in this case mul.si(3, 4)) does not run.

Does celery provide a way to continue failed chains from the task that failed, onwards? If not, what would be the best approach to accomplishing this and making sure that a chain's tasks run in the order specified and only after the previous task has executed successfully even if the task is retried a few times?

Note 1: The issue can be solved by doing

add.delay(1, 2).get()
mul.delay(3, 4).get()

but I am interested in understanding why chains do not work with failed tasks.

2条回答
等我变得足够好
2楼-- · 2019-01-25 06:58
Fickle 薄情
3楼-- · 2019-01-25 07:04

I'm also interested in understanding why chains do not work with failed tasks.

I dig some celery code and what I've found so far is:

The implementation happends at app.builtins.py

@shared_task
def add_chain_task(app):
    from celery.canvas import chord, group, maybe_subtask
    _app = app

    class Chain(app.Task):
        app = _app
        name = 'celery.chain'
        accept_magic_kwargs = False

        def prepare_steps(self, args, tasks):
            steps = deque(tasks)
            next_step = prev_task = prev_res = None
            tasks, results = [], []
            i = 0
            while steps:
                # First task get partial args from chain.
                task = maybe_subtask(steps.popleft())
                task = task.clone() if i else task.clone(args)
                i += 1
                tid = task.options.get('task_id')
                if tid is None:
                    tid = task.options['task_id'] = uuid()
                res = task.type.AsyncResult(tid)

                # automatically upgrade group(..) | s to chord(group, s)
                if isinstance(task, group):
                    try:
                        next_step = steps.popleft()
                    except IndexError:
                        next_step = None
                if next_step is not None:
                    task = chord(task, body=next_step, task_id=tid)
                if prev_task:
                    # link previous task to this task.
                    prev_task.link(task)
                    # set the results parent attribute.
                    res.parent = prev_res

                results.append(res)
                tasks.append(task)
                prev_task, prev_res = task, res

            return tasks, results

        def apply_async(self, args=(), kwargs={}, group_id=None, chord=None,
                task_id=None, **options):
            if self.app.conf.CELERY_ALWAYS_EAGER:
                return self.apply(args, kwargs, **options)
            options.pop('publisher', None)
            tasks, results = self.prepare_steps(args, kwargs['tasks'])
            result = results[-1]
            if group_id:
                tasks[-1].set(group_id=group_id)
            if chord:
                tasks[-1].set(chord=chord)
            if task_id:
                tasks[-1].set(task_id=task_id)
                result = tasks[-1].type.AsyncResult(task_id)
            tasks[0].apply_async()
            return result

        def apply(self, args=(), kwargs={}, **options):
            tasks = [maybe_subtask(task).clone() for task in kwargs['tasks']]
            res = prev = None
            for task in tasks:
                res = task.apply((prev.get(), ) if prev else ())
                res.parent, prev = prev, res
            return res
    return Chain

You can see that at the end prepare_steps prev_task is linked to the next task. When the prev_task failed the next task is not called.

I'm testing with adding the link_error from prev task to the next:

if prev_task:
    # link and link_error previous task to this task.
    prev_task.link(task)
    prev_task.link_error(task)
    # set the results parent attribute.
    res.parent = prev_res

But then, the next task must take care of both cases (maybe, except when it's configured to be immutable, e.g. not accept more arguments).

I think chain can support that by allowing some syntax likes this:

c = chain(t1, (t2, t1e), (t3, t2e))

which means:

t1 link to t2 and link_error to t1e

t2 link to t3 and link_error to t2e

查看更多
登录 后发表回答