3

I have the following workflow triggered from within a separate task on Celery 5.1.2:

    work_flow = chain([
        task_1.si(args),
        data_group,
        task_2.si(args),
        task_3.si(args),
        task_4.si(args),
    ]).apply_async(link_error=clean_up.s(args))

The variable data_group is defined beforehand:

    data_group = group([
        task_a.si(args),
        task_b.si(args),
    ])

My issue is that failing task_a or task_b using AsyncResult's revoke() method and the SIGUSR1 signal (which raises a SoftTimeLimitExceeded exception during execution) does not seem to trigger the clean_up task.

I have gone through my Celery logs and there are two warnings aside from the SoftTimeLimitExceeded exception which I induced:

[WARNING/MainProcess] Can't find ChordCounter for Group

[WARNING/ForkPoolWorker-3] Can't find ChordCounter for Group

I have been looking for similar issues online and reading through the Canvas: Designing Workflows section of the Celery documentation. I saw in the docs that "the return values of a group are not collected to be passed to a linked callback signature" (https://docs.celeryq.dev/en/stable/userguide/canvas.html#group-callbacks-and-error-handling) and was wondering if this would explain my issue. In that case, I am not sure how to move forward and restructure my workflow such that tasks a and b can both be run in parallel and call my clean_up task on failure.

0

0

Your Answer

By clicking “Post Your Answer”, you agree to our terms of service and acknowledge you have read our privacy policy.

Start asking to get answers

Find the answer to your question by asking.

Ask question

Explore related questions

See similar questions with these tags.