I'm writing an application which needs to run a series of tasks in parallel and then a single task with the results of all the tasks run:
@celery.task
def power(value, expo):
return value ** expo
@celery.task
def amass(values):
print str(values)
It's a very contrived and oversimplified example, but hopefully the point comes across well. Basically, I have many items which need to run through power
, but I only want to run amass
on the results from all of the tasks. All of this should happen asynchronously, and I don't need anything back from the amass
method.
Does anyone know how to set this up in celery so that everything is executed asynchronously and a single callback with a list of the results is called after all is said and done?
I've setup this example to run with a chord
as Alexander Afanasiev recommended:
from time import sleep
import random
tasks = []
for i in xrange(10):
tasks.append(power.s((i, 2)))
sleep(random.randint(10, 1000) / 1000.0) # sleep for 10-1000ms
callback = amass.s()
r = chord(tasks)(callback)
Unfortunately, in the above example, all tasks in tasks
are started only when the chord
method is called. Is there a way that each task can start separately and then I could add a callback to the group to run when everything has finished?
Celery has plenty of tools for most of workflows you can imagine.
It seems you need to get use of chord. Here's a quote from docs:
A chord is just like a group but with a callback. A chord consists of
a header group and a body, where the body is a task that should
execute after all of the tasks in the header are complete.
Here's a solution which worked for my purposes:
tasks.py:
from time import sleep
import random
@celery.task
def power(value, expo):
sleep(random.randint(10, 1000) / 1000.0) # sleep for 10-1000ms
return value ** expo
@celery.task
def amass(results, tasks):
completed_tasks = []
for task in tasks:
if task.ready():
completed_tasks.append(task)
results.append(task.get())
# remove completed tasks
tasks = list(set(tasks) - set(completed_tasks))
if len(tasks) > 0:
# resend the task to execute at least 1 second from now
amass.delay(results, tasks, countdown=1)
else:
# we done
print results
Use Case:
tasks = []
for i in xrange(10):
tasks.append(power.delay(i, 2))
amass.delay([], tasks)
What this should do is start all of the tasks as soon as possible asynchronously. Once they've all been posted to the queue, the amass
task will also be posted to the queue. The amass task will keep reposting itself until all of the other tasks have been completed.
Taking a look at this snippet from your question, it looks like you are passing a list
as the chord header, rather than a group
:
from time import sleep
import random
tasks = []
for i in xrange(10):
tasks.append(power.s((i, 2)))
sleep(random.randint(10, 1000) / 1000.0) # sleep for 10-1000ms
callback = amass.s()
r = chord(tasks)(callback)
Converting the list
to a group
should result in the behaviour you're expecting:
...
callback = amass.s()
tasks = group(tasks)
r = chord(tasks)(callback)
The answer that @alexander-afanasiev gave you is essentially right: use a chord.
Your code is OK, but tasks.append(power.s((i, 2)))
is not actually executing the subtask, just adding subtasks to a list. It's chord(...)(...)
the one that send as many messages to the broker as subtasks you have defined in tasks
list, plus one more message for the callback subtask. When you call chord
it returns as soon as it can.
If you want to know when the chord has finished you can poll for completion like with a single task using r.ready()
in your sample.