2

As a step one in my dag, i am trying to get a list of items from some source, say idList with count of say 100.

Is it possible in Airflow to process all the 100 items in idList, with a task concurrency of 4 maximum? (4 at a time) After one task is complete, it should pick up the next id from the idList and create task dynamically to process it.

I have tried the Dynamic Task Mapping but it doesn't seem to have a max parallelization/concurrency factor associated with it for the specific DAG Run.

1
  • I don't think this is currently possible, I just created an issue to propose the feature, and I will try to submit a PR this weekend. Commented Jan 21, 2023 at 12:58

2 Answers 2

4

In Airflow 2.6, we introduced a new parameter max_active_tis_per_dagrun to control the mapped task concurrency in the same DAG run.

Here is an example:

import pendulum
import time

from airflow.decorators import dag, task


@dag(
    dag_id='max_active_tis_per_dagrun',
    default_args={},
    start_date=pendulum.datetime(2021, 1, 1, tz="UTC"),
    schedule=None
)
def processing_dag():

    @task
    def get_numbers():
        return list(range(20))

    @task(max_active_tis_per_dagrun=2)
    def process(number):
        print(number)
        time.sleep(5)

    numbers = get_numbers()

    process.expand(number=numbers)


my_dag = processing_dag()

You can trigger 4 DAG runs via the UI, and check how many mapped task will be running in parallel in each DAG run.

Sign up to request clarification or add additional context in comments.

Comments

1

You can use pools to limit parallelism. Configure the name of a pool on the mapped task (e.g. pool="max_2"), for example:

import time
from datetime import datetime

from airflow import DAG
from airflow.decorators import task

with DAG(dag_id="dynamic_task_mapping_with_pool", start_date=datetime(2023, 1, 1), schedule_interval=None):

    @task
    def list_files():
        return list(range(10))

    @task(pool="max_2")
    def process_files(file):
        print(f"Do something with {file}")
        time.sleep(5)

    process_files.expand(file=list_files())

With a pool of size 2, you'll see the mapped instances progress in batches of 2 instances:

enter image description here enter image description here

3 Comments

The pool max_2 is used in all the dag runs, which is similar of using max_active_tis_per_dag to limit the number of parallel ti in all the dag runs of the dag. I don't think this is what he is looking for.
Right, I need to have this limit per DAG Run. There can be many instances of DAG Runs, but I need a max of 4 per running instance of the DAG.
That seems oddly specific, what's the use case for that? Parallelism is typically limited for resource consumption, but with a per-instance limit, you can technically still have an unlimited number of instances running.

Your Answer

By clicking “Post Your Answer”, you agree to our terms of service and acknowledge you have read our privacy policy.

Start asking to get answers

Find the answer to your question by asking.

Ask question

Explore related questions

See similar questions with these tags.