Problem
I've segmented a long-running task into logical subtasks, so I can report the results of each subtask as it completes. However, I'm trying to report the results of a task that will effectively never complete (instead yielding values as it goes), and am struggling to do so with my existing solution.
Background
I'm building a web interface to some Python programs I've written. Users can submit jobs through web forms, then check back to see the job's progress.
Let's say I have two functions, each accessed via separate forms:
med_func
: Takes ~1 minute to execute, results are passed off torender()
, which produces additional data.long_func
: Returns a generator. Eachyield
takes on the order of 30 minutes, and should be reported to the user. There are so many yields, we can consider this iterator as infinite (terminating only when revoked).
Code, current implementation
With med_func
, I report results as follows:
On form submission, I save an AsyncResult
to a Django session:
task_result = med_func.apply_async([form], link=render.s())
request.session["task_result"] = task_result
The Django view for the results page accesses this AsyncResult
. When a task has completed, results are saved into an object that is passed as context to a Django template.
def results(request):
""" Serve (possibly incomplete) results of a session's latest run. """
session = request.session
try: # Load most recent task
task_result = session["task_result"]
except KeyError: # Already cleared, or doesn't exist
if "results" not in session:
session["status"] = "No job submitted"
else: # Extract data from Asynchronous Tasks
session["status"] = task_result.status
if task_result.ready():
session["results"] = task_result.get()
render_task = task_result.children[0]
# Decorate with rendering results
session["render_status"] = render_task.status
if render_task.ready():
session["results"].render_output = render_task.get()
del(request.session["task_result"]) # Don't need any more
return render_to_response('results.html', request.session)
This solution only works when the function actually terminates. I can't chain together logical subtasks of long_func
, because there are an unknown number of yield
s (each iteration of long_func
's loop may not produce a result).
Question
Is there any sensible way to access yielded objects from an extremely long-running Celery task, so that they can be displayed before the generator is exhausted?
A couple options to consider:
1 -- task groups. If you can enumerate all the sub tasks from the time of invocation, you can apply the group as a whole -- that returns a TaskSetResult object you can use to monitor the results of the group as a whole, or of individual tasks in the group -- query this as-needed when you need to check status.
2 -- callbacks. If you can't enumerate all sub tasks (or even if you can!) you can define a web hook / callback that's the last step in the task -- called when the rest of the task completes. The hook would be against a URI in your app that ingests the result and makes it available via DB or app-internal API.
Some combination of these could solve your challenge.
Paul's answer is great. As an alternative to using
mark_as_started
you can useTask
'supdate_state
method. They ultimately do the same thing, but the name "update_state" is a little more appropriate for what you're trying to do. You can optionally define a custom state that indicates your task is in progress (I've named my custom state 'PROGRESS'):Personally, I would want to see the start time, duration, progress (number of items yielded), stop time (or ETA), status, and any other helpful information. It would be nice if it looked similar to a related display, maybe like
ps
on Linux. It is, after all, a process status.You could include some options to pause or kill the task, and/or to "open" it and display detailed information about the children or results.
Celery part:
Webclient side, starting task:
Testing last yielded value:
If you do not like to use cache, or it's impossible, you can use db, file or any other place which celery worker and server side will have both accesss. With cache it's a simplest solution, but workers and server have to use the same cache.
In order for Celery to know what the current state of the task is, it sets some metadata in whatever result backend you have. You can piggy-back on that to store other kinds of metadata.
I wouldn't throw a ton of data in there, but it works well for tracking the progress of a long-running task.
See also this great PyCon preso from one of the Instagram engineers.
http://blogs.vmware.com/vfabric/2013/04/how-instagram-feeds-work-celery-and-rabbitmq.html
At video mark 16:00, he discusses how they structure long lists of sub-tasks.