0

I am new to Celery. I want to run demo_task in parallel, but it runs tasks sequentially instead of in parallel. Please let me know if I did something wrong.

import time
from celery import Celery
from celery import chain, group, chord, chunks
import pandas as pd

CONFIG = {
    'BROKER_URL': 'redis://localhost:6379/0',
    'CELERY_RESULT_BACKEND': 'redis://localhost:6379/0',
}

app = Celery()
app.config_from_object(CONFIG)


@app.task(name='demo_task')
def demo_task(x, y):
    print("demo_task", x, y)
    pd.DataFrame({"a": [1, 2, 3], "b": [2, 3, 4]}).to_csv(f"demo{x}.csv", index=False)
    print("saved")
    time.sleep(8)


def run_task():
    print("start chain_call")
    t = group(*[demo_task.signature((3, 3)),
                demo_task.signature((4, 4)),
                demo_task.signature((5, 5))]
              ).apply_async()


if __name__ == '__main__':
    run_task()

[Command]

celery -A celery_demo worker -l info --pool=solo --purge

[Log]

[2022-04-22 16:29:51,668: WARNING/MainProcess] Please run `celery upgrade settings path/to/settings.py` to avoid these warnings and to allow a smoother upgrade to Celery 6.0.  
[2022-04-22 16:29:51,668: INFO/MainProcess] Connected to redis://localhost:6379/0  
[2022-04-22 16:29:51,668: INFO/MainProcess] mingle: searching for neighbors  
[2022-04-22 16:29:52,672: INFO/MainProcess] mingle: all alone  
[2022-04-22 16:30:05,602: WARNING/MainProcess]  
[2022-04-22 16:30:05,602: WARNING/MainProcess] 4  
[2022-04-22 16:30:05,602: WARNING/MainProcess]  
[2022-04-22 16:30:05,602: WARNING/MainProcess] 4  
[2022-04-22 16:30:05,602: WARNING/MainProcess] saved  
[2022-04-22 16:30:13,614: INFO/MainProcess] Task demo_task[c017c03e-b49d-4d54-85c5-4af57dd55908] succeeded in 8.016000000061467s: None  
[2022-04-22 16:30:13,614: INFO/MainProcess] Task demo_task[d60071c6-4332-4ec1-88fd-3fce79c06ab5] received  
[2022-04-22 16:30:13,614: WARNING/MainProcess] demo_task  
[2022-04-22 16:30:13,614: WARNING/MainProcess]  
[2022-04-22 16:30:13,614: WARNING/MainProcess] 5  
[2022-04-22 16:30:13,614: WARNING/MainProcess]  
[2022-04-22 16:30:13,614: WARNING/MainProcess] 5  
[2022-04-22 16:30:13,614: WARNING/MainProcess] saved  
[2022-04-22 16:30:21,634: INFO/MainProcess] Task demo_task[d60071c6-4332-4ec1-88fd-3fce79c06ab5] succeeded in 8.015000000130385s: None  
1
  • does the * before the list unpack the list into args? Commented Sep 4, 2022 at 8:16

2 Answers 2

4

How do you expect tasks to run in parallel if you use the "solo" pool?

Instead, start with the prefork concurrency (the default): celery -A celery_demo worker -l info -c 8

This will make Celery worker spawn 8 worker processes that can execute tasks in parallel. If your machine has more than 8 cores then you could increase that number from 8 to N where N is number of cores available on the host machine. I always go for N-1 to let the system have one more spare core for some other stuff.

Prefork concurrency is great for CPU-bound tasks. If your tasks are more about I/O, then give the "gevent" or "eventlet" concurrency type a try.

Sign up to request clarification or add additional context in comments.

1 Comment

Now it works on Linux. Thanks.
0

Modify your run_task function

async def run_task():
    print("start chain_call")
    t = await group(*[demo_task.signature((3, 3)),
            demo_task.signature((4, 4)),
            demo_task.signature((5, 5))]
          ).apply_async()

2 Comments

I change run_task() to asyncio.run(run_task()), and I got error message TypeError: object GroupResult can't be used in 'await' expression
Celery is not yet ready for async/await. Since Celery predates async/await it has own way to work with asynchronous execution. I am sure this will change in the (distant) future.

Your Answer

By clicking “Post Your Answer”, you agree to our terms of service and acknowledge you have read our privacy policy.

Start asking to get answers

Find the answer to your question by asking.

Ask question

Explore related questions

See similar questions with these tags.