I have a situation similar to the one outlined here, except that instead of chaining tasks with multiple arguments, I want to chain tasks that return a dictionary with multiple entries.
This is -- very loosely and abstractly --- what I'm trying to do:
tasks.py
@task()
def task1(item1=None, item2=None):
item3 = #do some stuff with item1 and item2 to yield item3
return_object = dict(item1=item1, item2=item2, item3=item3)
return return_object
def task2(item1=None, item2=None, item3=None):
item4 = #do something with item1, item2, item3 to yield item4
return_object = dict(item1=item1, item2=item2, item3=item3, item4=item4)
return return_object
Working from ipython, I am able to call task1 individually and asynchronously, with no problems.
I can ALSO call task2 individually with the result returned by task1 as a double-star argument:
>>res1 = task1.s(item1=something, item2=something_else).apply_async()
>>res1.status
'SUCCESS'
>>res2 = task2.s(**res1.result).apply_async()
>>res2.status
'SUCCESS
However, what I ultimately want achieve is the same end result as above, but via a chain, and here, I can't figure out how to have task2 instantiated not with (positional) arguments returned by task1, but with task1.result as **kwargs:
chain_result = (task1.s(item1=something, item2=something_else) | task2.s()).apply_async() #THIS DOESN'T WORK!
I suspect that I can go back and rewrite my tasks so that they return positional arguments instead of a dictionary, and this may clear things up, but it seems to me that there should be some way to access task1's return object in task2 with the equivalent functionality of the **double star. I also suspect that I'm missing something here fairly obvious about either Celery subtask implementation or *args vs. **kwargs.
Hope this makes sense. And thanks in advance for any tips.
chain
and the other canvas primitives are in the family of
functional utilities like map
and reduce
.
E.g. where map(target, items)
calls target(item)
for every item in the list,
Python has a rarely used version of map called itertools.starmap
,
which instead calls target(*item)
.
While we could add starchain
and even kwstarchain
to the toolbox, these
would be very specialized and probably not used as often.
Interestingly Python has made these unnecessary with the list and generator expressions,
so that map is replaced with [target(item) for item in item]
and starmap with [target(*item) for item in item]
.
So instead of implementing several alternatives for each primitive I think we should
focus on finding a more flexible way of supporting this, e.g. like having celery powered generator expressions (if possible, and if not something similarly powerful)
This is my take at the problem, using an abstract task class:
from __future__ import absolute_import
from celery import Task
from myapp.tasks.celery import app
class ChainedTask(Task):
abstract = True
def __call__(self, *args, **kwargs):
if len(args) == 1 and isinstance(args[0], dict):
kwargs.update(args[0])
args = ()
return super(ChainedTask, self).__call__(*args, **kwargs)
@app.task(base=ChainedTask)
def task1(x, y):
return {'x': x * 2, 'y': y * 2, 'z': x * y}
@app.task(base=ChainedTask)
def task2(x, y, z):
return {'x': x * 3, 'y': y * 3, 'z': z * 2}
You can now define and execute your chain as such:
from celery import chain
pipe = chain(task1.s(x=1, y=2) | task2.s())
pipe.apply_async()
Since this isn't built into celery, I wrote a decorator function to something similar myself.
# Use this wrapper with functions in chains that return a tuple. The
# next function in the chain will get called with that the contents of
# tuple as (first) positional args, rather than just as just the first
# arg. Note that both the sending and receiving function must have
# this wrapper, which goes between the @task decorator and the
# function definition. This wrapper should not otherwise interfere
# when these conditions are not met.
class UnwrapMe(object):
def __init__(self, contents):
self.contents = contents
def __call__(self):
return self.contents
def wrap_for_chain(f):
""" Too much deep magic. """
@functools.wraps(f)
def _wrapper(*args, **kwargs):
if type(args[0]) == UnwrapMe:
args = list(args[0]()) + list(args[1:])
result = f(*args, **kwargs)
if type(result) == tuple and current_task.request.callbacks:
return UnwrapMe(result)
else:
return result
return _wrapper
Mine unwraps like the starchain
concept, but you could easily modify it to unwrap kwargs instead.