My question is probably pretty basic but still I can't get a solution in the official doc. I have defined a Celery chain inside my Django application, performing a set of tasks dependent from eanch other:
chain( tasks.apply_fetching_decision.s(x, y),
tasks.retrieve_public_info.s(z, x, y),
tasks.public_adapter.s())()
Obviously the second and the third tasks need the output of the parent, that's why I used a chain.
Now the question: I need to programmatically revoke the 2nd and the 3rd tasks if a test condition in the 1st task fails. How to do it in a clean way? I know I can revoke the tasks of a chain from within the method where I have defined the chain (see thisquestion and this doc) but inside the first task I have no visibility of subsequent tasks nor of the chain itself.
Temporary solution
My current solution is to skip the computation inside the subsequent tasks based on result of the previous task:
@shared_task
def retrieve_public_info(result, x, y):
if not result:
return []
...
@shared_task
def public_adapter(result, z, x, y):
for r in result:
...
But this "workaround" has some flaw:
- Adds unnecessary logic to each task (based on predecessor's result), compromising reuse
- Still executes the subsequent tasks, with all the resulting overhead
I haven't played too much with passing references of the chain to tasks for fear of messing up things. I admit also I haven't tried Exception-throwing approach, because I think that the choice of not proceeding through the chain can be a functional (thus non exceptional) scenario...
Thanks for helping!
As of Celery 4.0, what I found to be working is to remove the remaining tasks from the current task instance's request using the statement:
Let's say you have a chain of tasks
a.s() | b.s() | c.s()
. You can only access theself
variable inside a task if you bind the task by passingbind=True
as argument to the tasks' decorator.If
something_happened
is truthy,b
andc
wouldn't be executed.I think I found the answer to this issue: this seems the right way to proceed, indeed. I wonder why such common scenario is not documented anywhere, though.
For completeness I post the basic code snapshot:
Update
I implemented a more elegant way to cope with the issue and I want to share it with you. I am using a decorator called
revoke_chain_authority
, so that it can revoke automatically the chain without rewriting the code I previously described.This decorator can be used on a
shared task
as follows:Please note the use of
@wraps
. It is necessary to preserve the signature of the original function, otherwise this latter will be lost andcelery
will make a mess at calling the right wrapped task (e.g. it will call always the first registered function instead of the right one)