I have a check_orders task that's executed periodically. It makes a group of tasks so that I can time how long executing the tasks took, and perform something when they're all done (this is the purpose of res.join [1] and grouped_subs) The tasks that are grouped are pairs of chained tasks.
What I want is for when the first task doesn't meet a condition (fails) don't execute the second task in the chain. I can't figure this out for the life of me and I feel this is pretty basic functionality for a job queue manager. When I try the things I have commented out after [2] (raising exceptions, removing callbacks)... we get stuck on the join() in check_orders for some reason (it breaks the group). I've tried setting ignore_result to False as well for all these tasks but it still doesn't work.
@task(ignore_result=True)
def check_orders():
# check all the orders and send out appropriate notifications
grouped_subs = []
for thingy in things:
...
grouped_subs.append(chain(is_room_open.subtask((args_sub_1, )),
notify.subtask((args_sub_2, ), immutable=True)))
res = group(grouped_subs).apply_async()
res.join() #[1]
logger.info('Done checking orders at %s' % current_task.request.id))
@task(ignore_result=True)
def is_room_open(args_sub_1):
#something time consuming
if http_req_and_parse(args_sub_1):
# go on and do the notify task
return True
else:
# [2]
# STOP THE CHAIN SOMEHOW! Don't execute the rest of the chain, how?
# None of the following things work:
# is_room_open.update_state(state='FAILURE')
# raise celery.exceptions.Ignore()
# raise Exception('spam', 'eggs')
# current_task.request.callbacks[:] = []
@task(ignore_result=True)
def notify(args_sub_2):
# something else time consuming, only do this if the first part of the chain
# passed a test (the chained tasks before this were 'successful'
notify_user(args_sub_2)
In my opinion this is a common use-case that doesn't get enough love in the documentation.
Assuming you want to abort a chain mid-way while still reporting SUCCESS as status of the completed tasks, and not sending any error log or whatnot (else you can just raise an exception) then a way to accomplish this is:
So in your example:
Will work. Note that instead of
ignore_result=True
andsubtask()
you can use the shortcut.si()
as stated by @abbasov-alexanderEdited to work with EAGER mode, as suggested by @PhilipGarnero in the comments.
It's unbelievable as a so common case isn't treated in any official documentation. I had to cope with the same issue (but using
shared_tasks
withbind
option, so we have visibility ofself
object), so I wrote a custom decorator that handles automatically the revocation:You can use it as follows:
See the full explanation here. Hope it helps!
Firstly, it seems if into the function exists exception
ignore_result
don't help you.Secondly, you use immutable=True It means that next function (in our case is notify) does not take additional arguments. You should use
notify.subtask((args_sub_2, ), immutable=False)
of course if it suitable for your decision.Third, you can use shortcuts:
notify.si(args_sub_2)
insteadnotify.subtask((args_sub_2, ), immutable=True)
and
is_room_open.s(args_sub_1)
insteadis_room_open.subtask((args_sub_1, ))
Try use it code:
If you want catch exceptions you must use callback as so
is_room_open.s(args_sub_1, link_error=log_error.s())