0

I’m building an API that registers tasks to Celery, and I want to implement a conditional workflow based on the results of the tasks. Specifically, I need to execute Task B if Task A returns a certain result, and I also want to create an on_success callback to execute a different task when Task A completes successfully.

Example Scenario

  1. Task A: This task processes some data and returns a result (either True or False).
  2. Task B: This task should only execute if Task A returns True.
  3. Task C: This task should run after Task A completes successfully, regardless of its result.

Task Definitions

Here’s how I’ve defined my tasks:

from celery import Celery, chain

app = Celery('my_tasks', broker='redis://localhost:6379/0')

@app.task
def task_a():
    # Simulate processing and returning a result
    result = True  # or False, based on some condition
    return result

@app.task
def task_b():
    return "Task B executed."

@app.task
def task_c():
    return "Task C executed."

API Registration

I want to set up my API in such a way that it registers these tasks with the desired workflow. Here’s what I have in mind for the API:

from flask import Flask, request
from my_tasks import task_a, task_b, task_c

app = Flask(__name__)

@app.route('/register-task', methods=['POST'])
def register_task():
    # Register task_a and create a conditional workflow
    workflow = (
        task_a.s()  # Start with task_a
        | task_b.s()  # This should execute if task_a returns True
        | task_c.s()  # This should always run after task_a
    )

    # Call the workflow
    result = workflow.delay()  # Queue the workflow

    return {"task_id": result.id}, 202

Desired Workflow

I would like to achieve the following:

  • If task_a() returns True, then task_b() should be executed.
  • Regardless of the result of task_a(), task_c() should always run after task_a() completes successfully.

My Questions

  1. How can I conditionally execute task_b() based on the result of task_a() within the API registration?
  2. How can I ensure that task_c() runs after task_a() completes, regardless of its result?
  3. What would be the best way to structure this workflow within my API?

......................................................................................

1 Answer 1

0

I see a couple ways this can be approached depending on what fits your use case best.

Option 1: Use Chains and Modify Task B

In this case, you can execute it as a chain, Task B will receive the output of Task A. You'll have to modify Task B to accept this input, and skip it's tasks based on whether the input is True/False.

An example from the Docs

from celery import chain
from proj.tasks import add, mul

# (4 + 4) * 8 * 10
res = chain(add.s(4, 4), mul.s(8), mul.s(10))
proj.tasks.add(4, 4) | proj.tasks.mul(8) | proj.tasks.mul(10)

https://docs.celeryq.dev/en/latest/userguide/canvas.html#chains

The downside of this approach is that Task B will always be technically run and will need to incorporate logic that doesn't necessarily isolate the logic from the rest of the system.

Option 2: Add a 'Long-running' Management/Supervisor Task

This option is a bit of an anti-pattern, but I've found it useful in the past for monitoring Workflows.

In this case, you'll call this 1 main Task, and this Task will call Task A, wait for it to complete, and based on it's result, Execute Task B, then Task C etc. This new Task generally shouldn't use a prefork worker, but either gevent/eventlet/threads since it's mostly sitting around waiting. Also ensure it won't timeout as it will be running for the entire time of the workflow.

The benefit is that you can implement much more complicated logic if necessary without complicating the individual tasks, but the downside is this extra task will have to be running and you may find error handling to be a pain.

Sign up to request clarification or add additional context in comments.

Comments

Your Answer

By clicking “Post Your Answer”, you agree to our terms of service and acknowledge you have read our privacy policy.

Start asking to get answers

Find the answer to your question by asking.

Ask question

Explore related questions

See similar questions with these tags.