I'm stuck with a relatively complex celery chain configuration, trying to achieve the following. Assume there's a chain of tasks like the following:
chain1 = chain(
DownloadFile.s("http://someserver/file.gz"), # downloads file, returns temp file name
UnpackFile.s(), # unpacks the gzip comp'd file, returns temp file name
ParseFile.s(), # parses file, returns list URLs to download
)
Now I want to download each URL in parallel, so what I did was:
urls = chain1.get()
download_tasks = map(lambda x: DownloadFile.s(x), urls)
res1 = celery.group(download_tasks)()
res1_data = res1.get()
Finally, I want to take each downloaded file (a temp file name is returned from the DownloadFile
) returned from ParseFile
and run it through another chain of tasks, in parallel (e.g. it will be a group
of chain
s):
chains = []
for tmpfile in res:
chains.append(celery.chain(
foo.s(tmpfile),
bar.s(),
baz.s()
))
res2 = celery.group(*chains)()
res2_data = res2.get()
The approach works fine if I run it in a normal Python process (not another celery Task), because I'm able to wait for results from chain1
, then construct the download task group and new chains for each downloaded file.
However, now I want to wrap all this stuff into another Celery task by wrapping it in an another @app.task
decorated function, and it turns out you can't call (or really shouldn't call .get()
from inside a task to wait for another task to complete), and I failed to find a solution of "porting" this workflow to run inside a task. I tried to add res1
into the chain1
chain, but celery complains that <GroupResult: ..... > is not JSON serializable
.
Can anybody suggest a way to make it work? Thanks!
Indeed you it's bad to call
.get()
inside a task. The goalCelery
is to perform asynchronous tasks in parallel so you should not wait for results.One way to solve your problem is to store the urls result of your first processing (either in files or in a database).
I wrote a short example of what you can do by writing results to files. I chose
json
dumping.Suppose you have a list of
urls
in yourmain
. First you launch asynchronous processing all those urls with agroup
ofchain
. All those tasks will process urls and store the list of urls to download in files located in the specified tmp directory.Then you also launch the
check_dir
task that will check in the directory if files have been written and in this case process every file and delete the corresponding file in the tmp directory.With the parameters I chose, this tasks autoretries every 30sec and never ends (I supposed you had a reccurent job to execute) so you might change this but it was to give you an idea of how you could manage.
I ran it as a
main
but can also wrap it into another celery task if you want.app_module.py
tasks.py
main.py
console output :