2

I'm stuck with a relatively complex celery chain configuration, trying to achieve the following. Assume there's a chain of tasks like the following:

chain1 = chain(
    DownloadFile.s("http://someserver/file.gz"), # downloads file, returns temp file name
    UnpackFile.s(), # unpacks the gzip comp'd file, returns temp file name
    ParseFile.s(), # parses file, returns list URLs to download
)

Now I want to download each URL in parallel, so what I did was:

urls = chain1.get()
download_tasks = map(lambda x: DownloadFile.s(x), urls)
res1 = celery.group(download_tasks)()
res1_data = res1.get()

Finally, I want to take each downloaded file (a temp file name is returned from the DownloadFile) returned from ParseFile and run it through another chain of tasks, in parallel (e.g. it will be a group of chains):

chains = []
for tmpfile in res:
    chains.append(celery.chain(
        foo.s(tmpfile),
        bar.s(),
        baz.s()
    ))

res2 = celery.group(*chains)()
res2_data = res2.get()

The approach works fine if I run it in a normal Python process (not another celery Task), because I'm able to wait for results from chain1, then construct the download task group and new chains for each downloaded file.

However, now I want to wrap all this stuff into another Celery task by wrapping it in an another @app.task decorated function, and it turns out you can't call (or really shouldn't call .get() from inside a task to wait for another task to complete), and I failed to find a solution of "porting" this workflow to run inside a task. I tried to add res1 into the chain1 chain, but celery complains that <GroupResult: ..... > is not JSON serializable.

Can anybody suggest a way to make it work? Thanks!

1
  • are you interested by the results of your foo_bar_baz processing ? I mean, is it sufficient to know that the processing have been done (for example you clean the file and save it somewhere) Commented Feb 14, 2017 at 16:15

1 Answer 1

5

Indeed you it's bad to call .get() inside a task. The goal Celery is to perform asynchronous tasks in parallel so you should not wait for results.

One way to solve your problem is to store the urls result of your first processing (either in files or in a database).

I wrote a short example of what you can do by writing results to files. I chose json dumping.

Suppose you have a list of urls in your main. First you launch asynchronous processing all those urls with a group of chain. All those tasks will process urls and store the list of urls to download in files located in the specified tmp directory.

Then you also launch the check_dir task that will check in the directory if files have been written and in this case process every file and delete the corresponding file in the tmp directory.

With the parameters I chose, this tasks autoretries every 30sec and never ends (I supposed you had a reccurent job to execute) so you might change this but it was to give you an idea of how you could manage.

I ran it as a main but can also wrap it into another celery task if you want.

app_module.py

from __future__ import absolute_import
from celery import Celery

app = Celery('app')
app.config_from_object("settings")

if __name__ == '__main__':
    app.start()

tasks.py

from celery import group, chain
from app_module import app

import json
import glob
import os

__all__ = ('download_file',
                'unpack_file',
                'parse_file',
                'foo',
                'bar',
                'process_downloaded_file',
                'check_dir',)

path = "./data/tmp_dir/"

@app.task
def download_file(filename):
    return filename

@app.task
def unpack_file(filename):
    return "unzipped_" + filename

@app.task
def parse_file(filename):
    # Fake parse task storing results in a temp directory
    # results are stored in a json and contains the list of urls
    with open(path + filename, "wb") as f:
        d = {"files" : [filename+"_" + str(i) for i in range(0,5)]}
        json.dump(d, f)
    return True

@app.task
def foo(filename):
    return "foo_" + filename

@app.task
def bar(filename):
    return "bar_" + filename


@app.task
def process_downloaded_file(filename):
    #process one file in the temp directory and at the end delete the file so it
    # is not processed several times
    with open(filename, "rb") as f:
        d = json.load(f)
    g = group(chain(download_file.s(f), foo.s(), bar.s()) for f in d["files"]).apply_async()
    os.remove(filename)
    return True

@app.task(bind=True)
def check_dir(self, tmp_dir, sleep=30):
    #this task checks the tmp directory. If files have been written it processes
    #every file in the directory. The task autoretries each *sleep* seconds
    for f in glob.glob(tmp_dir + "*"):
        process_downloaded_file.delay(f)
    self.retry(args=(tmp_dir, sleep), countdown=sleep)

main.py

from celery import group, chain
from tasks import *

path = "./data/tmp_dir/"
urls = ["file1", "file2"]
group(chain(download_file.s(f), unpack_file.s(), parse_file.s()) for f in urls).apply_async()
check_dir.delay(path)

console output :

[2017-02-14 18:10:41,630: INFO/MainProcess] Received task: arthur.tasks.download_file[65cb06c6-b8b6-4108-af36-84103037e4a2]
[2017-02-14 18:10:41,632: INFO/MainProcess] Received task: arthur.tasks.download_file[d069e046-4153-4320-8f9d-a22adeeb2827]
[2017-02-14 18:10:41,637: INFO/MainProcess] Received task: arthur.tasks.check_dir[19a1984f-c2ed-4de6-82d6-b5ad5a6bacc5]
[2017-02-14 18:10:41,666: INFO/MainProcess] Received task: arthur.tasks.unpack_file[e9eab102-8ae0-4000-b384-5cfa0e01e805]
[2017-02-14 18:10:41,674: INFO/MainProcess] Task arthur.tasks.download_file[65cb06c6-b8b6-4108-af36-84103037e4a2] succeeded in 0.0389260330703s: u'file1'
[2017-02-14 18:10:41,682: INFO/MainProcess] Received task: arthur.tasks.unpack_file[47b13b21-57e2-44be-82dd-f8e0e1adff2e]
[2017-02-14 18:10:41,689: INFO/MainProcess] Task arthur.tasks.download_file[d069e046-4153-4320-8f9d-a22adeeb2827] succeeded in 0.0534016339807s: u'file2'
[2017-02-14 18:10:41,691: INFO/MainProcess] Received task: arthur.tasks.parse_file[bd3fa287-9cf0-4802-88ca-2593c27af4f7]
[2017-02-14 18:10:41,696: INFO/MainProcess] Task arthur.tasks.unpack_file[47b13b21-57e2-44be-82dd-f8e0e1adff2e] succeeded in 0.00816849502735s: u'unzipped_file2'
[2017-02-14 18:10:41,704: INFO/MainProcess] Received task: arthur.tasks.process_downloaded_file[1b72f409-f5b5-480a-b651-616dc02b2207]
[2017-02-14 18:10:41,706: INFO/MainProcess] Task arthur.tasks.parse_file[bd3fa287-9cf0-4802-88ca-2593c27af4f7] succeeded in 0.00894999306183s: True
[2017-02-14 18:10:41,708: INFO/MainProcess] Task arthur.tasks.check_dir[19a1984f-c2ed-4de6-82d6-b5ad5a6bacc5] retry: Retry in 30s
[2017-02-14 18:10:41,709: INFO/MainProcess] Received task: arthur.tasks.process_downloaded_file[1d8d340f-61f7-4ef3-a90e-913a3bfb5478]
[2017-02-14 18:10:41,713: INFO/MainProcess] Task arthur.tasks.unpack_file[e9eab102-8ae0-4000-b384-5cfa0e01e805] succeeded in 0.044072615914s: u'unzipped_file1'
[2017-02-14 18:10:41,714: INFO/MainProcess] Received task: arthur.tasks.check_dir[19a1984f-c2ed-4de6-82d6-b5ad5a6bacc5] eta:[2017-02-14 17:11:11.692241+00:00]
[2017-02-14 18:10:41,717: INFO/MainProcess] Received task: arthur.tasks.parse_file[e839826a-dfa5-4df0-a716-9c21371c297f]
[2017-02-14 18:10:41,720: INFO/MainProcess] Received task: arthur.tasks.download_file[743153f9-9c92-430e-84f5-7d99a269c104]
[2017-02-14 18:10:41,724: INFO/MainProcess] Task arthur.tasks.process_downloaded_file[1b72f409-f5b5-480a-b651-616dc02b2207] succeeded in 0.0153999190079s: True
[2017-02-14 18:10:41,725: INFO/MainProcess] Task arthur.tasks.parse_file[e839826a-dfa5-4df0-a716-9c21371c297f] succeeded in 0.00395095907152s: True
[2017-02-14 18:10:41,726: INFO/MainProcess] Task arthur.tasks.download_file[743153f9-9c92-430e-84f5-7d99a269c104] succeeded in 0.00449692492839s: u'unzipped_file1_0'
[2017-02-14 18:10:41,727: INFO/MainProcess] Received task: arthur.tasks.download_file[a29470d7-85a0-4a91-a410-2e51cff81cea]
[2017-02-14 18:10:41,728: INFO/MainProcess] Task arthur.tasks.process_downloaded_file[1d8d340f-61f7-4ef3-a90e-913a3bfb5478] succeeded in 0.0129376259865s: True
[2017-02-14 18:10:41,729: INFO/MainProcess] Received task: arthur.tasks.download_file[44a1cc48-52a4-4548-a862-48d402dd92f1]
[2017-02-14 18:10:41,731: INFO/MainProcess] Received task: arthur.tasks.download_file[cedf91b2-6e3f-48c3-880e-b80a1c38efed]
[2017-02-14 18:10:41,733: INFO/MainProcess] Task arthur.tasks.download_file[a29470d7-85a0-4a91-a410-2e51cff81cea] succeeded in 0.003385586082s: u'unzipped_file1_1'
[2017-02-14 18:10:41,734: INFO/MainProcess] Task arthur.tasks.download_file[44a1cc48-52a4-4548-a862-48d402dd92f1] succeeded in 0.00395720102824s: u'unzipped_file1_2'
[2017-02-14 18:10:41,735: INFO/MainProcess] Received task: arthur.tasks.download_file[d93a7260-43dc-4e77-b5ff-ce0e3bc426ce]
[2017-02-14 18:10:41,739: INFO/MainProcess] Task arthur.tasks.download_file[d93a7260-43dc-4e77-b5ff-ce0e3bc426ce] succeeded in 0.00272180500906s: u'unzipped_file1_4'
[2017-02-14 18:10:41,740: INFO/MainProcess] Task arthur.tasks.download_file[cedf91b2-6e3f-48c3-880e-b80a1c38efed] succeeded in 0.00340146606322s: u'unzipped_file1_3'
[2017-02-14 18:10:41,740: INFO/MainProcess] Received task: arthur.tasks.download_file[f1b19d02-a97d-4e32-afde-e39d46d45bad]
[2017-02-14 18:10:41,742: INFO/MainProcess] Received task: arthur.tasks.download_file[4a0bce55-8662-42a6-a19d-3ff33496d7e0]
[2017-02-14 18:10:41,745: INFO/MainProcess] Received task: arthur.tasks.download_file[a759d6a1-a558-46ba-8ee1-2cb28cbe0655]
[2017-02-14 18:10:41,747: INFO/MainProcess] Task arthur.tasks.download_file[f1b19d02-a97d-4e32-afde-e39d46d45bad] succeeded in 0.00358341098763s: u'unzipped_file2_0'
[2017-02-14 18:10:41,748: INFO/MainProcess] Task arthur.tasks.download_file[4a0bce55-8662-42a6-a19d-3ff33496d7e0] succeeded in 0.0044348789379s: u'unzipped_file2_1'
[2017-02-14 18:10:41,749: INFO/MainProcess] Received task: arthur.tasks.foo[e3250c36-92e9-4f53-afef-fe95b035e0dd]
[2017-02-14 18:10:41,752: INFO/MainProcess] Received task: arthur.tasks.download_file[3e9db0d1-31c5-4703-8e9d-c2b9f4237d8d]
[2017-02-14 18:10:41,754: INFO/MainProcess] Task arthur.tasks.download_file[a759d6a1-a558-46ba-8ee1-2cb28cbe0655] succeeded in 0.00349929102231s: u'unzipped_file2_2'
[2017-02-14 18:10:41,755: INFO/MainProcess] Task arthur.tasks.foo[e3250c36-92e9-4f53-afef-fe95b035e0dd] succeeded in 0.00417044304777s: u'foo_unzipped_file1_0'
[2017-02-14 18:10:41,755: INFO/MainProcess] Received task: arthur.tasks.download_file[dcda209f-f4be-4697-84c1-e55a8502a45c]
[2017-02-14 18:10:41,757: INFO/MainProcess] Received task: arthur.tasks.foo[3e9db173-7200-4c46-aade-72be5553b0cf]
[2017-02-14 18:10:41,760: INFO/MainProcess] Task arthur.tasks.download_file[3e9db0d1-31c5-4703-8e9d-c2b9f4237d8d] succeeded in 0.00325334002264s: u'unzipped_file2_3'
[2017-02-14 18:10:41,760: INFO/MainProcess] Task arthur.tasks.download_file[dcda209f-f4be-4697-84c1-e55a8502a45c] succeeded in 0.00384710694198s: u'unzipped_file2_4'
[2017-02-14 18:10:41,761: INFO/MainProcess] Received task: arthur.tasks.foo[d936cddc-027c-4640-8a0b-26a7d9723ccc]
[2017-02-14 18:10:41,764: INFO/MainProcess] Received task: arthur.tasks.foo[11ae4aef-1af9-43a0-94b8-7b95575cd1bc]
[2017-02-14 18:10:41,765: INFO/MainProcess] Task arthur.tasks.foo[3e9db173-7200-4c46-aade-72be5553b0cf] succeeded in 0.00316555600148s: u'foo_unzipped_file1_1'
[2017-02-14 18:10:41,766: INFO/MainProcess] Task arthur.tasks.foo[d936cddc-027c-4640-8a0b-26a7d9723ccc] succeeded in 0.00383736204822s: u'foo_unzipped_file1_2'
[2017-02-14 18:10:41,767: INFO/MainProcess] Received task: arthur.tasks.foo[9e60adad-57e2-4a6e-874d-c687df189714]
[2017-02-14 18:10:41,769: INFO/MainProcess] Received task: arthur.tasks.foo[8b1eebb8-abb0-4223-872c-e9687031380c]
[2017-02-14 18:10:41,771: INFO/MainProcess] Task arthur.tasks.foo[11ae4aef-1af9-43a0-94b8-7b95575cd1bc] succeeded in 0.00347809505183s: u'foo_unzipped_file1_3'
[2017-02-14 18:10:41,772: INFO/MainProcess] Task arthur.tasks.foo[9e60adad-57e2-4a6e-874d-c687df189714] succeeded in 0.00403305899817s: u'foo_unzipped_file1_4'
[2017-02-14 18:10:41,773: INFO/MainProcess] Received task: arthur.tasks.foo[f9c137d7-4087-4519-919d-62bba457747f]
[2017-02-14 18:10:41,775: INFO/MainProcess] Received task: arthur.tasks.foo[2a43d460-aceb-465e-8be5-678cb930a60e]
[2017-02-14 18:10:41,777: INFO/MainProcess] Task arthur.tasks.foo[8b1eebb8-abb0-4223-872c-e9687031380c] succeeded in 0.00311726506334s: u'foo_unzipped_file2_0'
[2017-02-14 18:10:41,778: INFO/MainProcess] Task arthur.tasks.foo[f9c137d7-4087-4519-919d-62bba457747f] succeeded in 0.00378636294045s: u'foo_unzipped_file2_1'
[2017-02-14 18:10:41,778: INFO/MainProcess] Received task: arthur.tasks.bar[770d4cd4-527c-4efe-975f-daf337934c78]
[2017-02-14 18:10:41,780: INFO/MainProcess] Received task: arthur.tasks.foo[c09677f9-183e-43ef-889c-c8b7cab2bd23]
[2017-02-14 18:10:41,783: INFO/MainProcess] Task arthur.tasks.foo[2a43d460-aceb-465e-8be5-678cb930a60e] succeeded in 0.00324743904639s: u'foo_unzipped_file2_2'
[2017-02-14 18:10:41,783: INFO/MainProcess] Task arthur.tasks.bar[770d4cd4-527c-4efe-975f-daf337934c78] succeeded in 0.00382692192215s: u'bar_foo_unzipped_file1_0'
[2017-02-14 18:10:41,784: INFO/MainProcess] Received task: arthur.tasks.foo[1a6294da-8cae-4bf1-9d56-be5972254e07]
[2017-02-14 18:10:41,787: INFO/MainProcess] Received task: arthur.tasks.bar[bd15721f-3bea-4c64-a0c4-59c5c8730171]
[2017-02-14 18:10:41,788: INFO/MainProcess] Task arthur.tasks.foo[c09677f9-183e-43ef-889c-c8b7cab2bd23] succeeded in 0.00343648903072s: u'foo_unzipped_file2_4'
[2017-02-14 18:10:41,789: INFO/MainProcess] Task arthur.tasks.foo[1a6294da-8cae-4bf1-9d56-be5972254e07] succeeded in 0.00413183600176s: u'foo_unzipped_file2_3'
[2017-02-14 18:10:41,790: INFO/MainProcess] Received task: arthur.tasks.bar[29a982bd-2a72-49e7-bc56-0f2a4b2ba947]
[2017-02-14 18:10:41,792: INFO/MainProcess] Received task: arthur.tasks.bar[5944c49d-428d-4237-8777-edec76b36512]
[2017-02-14 18:10:41,794: INFO/MainProcess] Task arthur.tasks.bar[bd15721f-3bea-4c64-a0c4-59c5c8730171] succeeded in 0.0031840458978s: u'bar_foo_unzipped_file1_2'
[2017-02-14 18:10:41,795: INFO/MainProcess] Task arthur.tasks.bar[29a982bd-2a72-49e7-bc56-0f2a4b2ba947] succeeded in 0.00374374503735s: u'bar_foo_unzipped_file1_1'
[2017-02-14 18:10:41,796: INFO/MainProcess] Received task: arthur.tasks.bar[12272aad-f6e6-432e-945a-363a678ba2a8]
[2017-02-14 18:10:41,798: INFO/MainProcess] Task arthur.tasks.bar[5944c49d-428d-4237-8777-edec76b36512] succeeded in 0.00241802399978s: u'bar_foo_unzipped_file1_4'
[2017-02-14 18:10:41,798: INFO/MainProcess] Received task: arthur.tasks.bar[493cc5cc-797b-40f3-87a7-1394af1ae45d]
[2017-02-14 18:10:41,801: INFO/MainProcess] Received task: arthur.tasks.bar[e2925c2b-426d-4076-8a8c-c67c56a2ab8e]
[2017-02-14 18:10:41,803: INFO/MainProcess] Task arthur.tasks.bar[12272aad-f6e6-432e-945a-363a678ba2a8] succeeded in 0.00308170204517s: u'bar_foo_unzipped_file1_3'
[2017-02-14 18:10:41,804: INFO/MainProcess] Task arthur.tasks.bar[493cc5cc-797b-40f3-87a7-1394af1ae45d] succeeded in 0.00375492009334s: u'bar_foo_unzipped_file2_0'
[2017-02-14 18:10:41,804: INFO/MainProcess] Received task: arthur.tasks.bar[c4b4e9de-4ce7-476f-b275-278db3d8099f]
[2017-02-14 18:10:41,807: INFO/MainProcess] Received task: arthur.tasks.bar[b0cdb87c-292f-4f14-975c-c7bd4679373d]
[2017-02-14 18:10:41,808: INFO/MainProcess] Task arthur.tasks.bar[c4b4e9de-4ce7-476f-b275-278db3d8099f] succeeded in 0.00304232595954s: u'bar_foo_unzipped_file2_2'
[2017-02-14 18:10:41,809: INFO/MainProcess] Task arthur.tasks.bar[e2925c2b-426d-4076-8a8c-c67c56a2ab8e] succeeded in 0.00377448496874s: u'bar_foo_unzipped_file2_1'
[2017-02-14 18:10:41,810: INFO/MainProcess] Received task: arthur.tasks.bar[3570e196-7c41-43b1-b7ef-68b2d31f28a2]
[2017-02-14 18:10:41,813: INFO/MainProcess] Task arthur.tasks.bar[b0cdb87c-292f-4f14-975c-c7bd4679373d] succeeded in 0.00181642104872s: u'bar_foo_unzipped_file2_4'
[2017-02-14 18:10:41,813: INFO/MainProcess] Task arthur.tasks.bar[3570e196-7c41-43b1-b7ef-68b2d31f28a2] succeeded in 0.00239081599284s: u'bar_foo_unzipped_file2_3'
Sign up to request clarification or add additional context in comments.

3 Comments

Thanks for a detailed answer, I will accept it because I actually ended up doing a similar thing. However, one problem remained, is that Celery apparenly doesn't wait for chains in the group to finish end return their results (the group will return as soon as the chains had their tasks scheduled for run), so I guess there's no nice and easy way to get the result in this scenario, without writing to files or a database.
indeed celery does not wait for group to finish. but if you need to wait for the results then celery may not be what you need. I don't see any nice celery workflow to deal with your scenario because you have to wait the first step to finish before starting the second step.
but a nice feature of the group is that you save and restore it. for example in the task process_downloaded_file you can do : g.save() and print(g.id). Then in another part of the code which is not in a task, you can do g = app.GroupResult.restore(the_group_id) and g.get()

Your Answer

By clicking “Post Your Answer”, you agree to our terms of service and acknowledge you have read our privacy policy.

Start asking to get answers

Find the answer to your question by asking.

Ask question

Explore related questions

See similar questions with these tags.