0

I want to create parallel running. For each id in ids list create a new PythonOperator how can I pass the ids list outside prepare_parameters function? or alternatively can I create the operations list in prepare_parameters call prepare_parameters_operator >> operation >> end_tasks?

def prepare_parameters(**context):
  dag_conf = context['dag_run'].conf
  ids = dag_conf['ids'] if 'ids' in dag_conf else [1,2,3]
  context['ti'].xcom_push(key='ids', value=ids)
  

def do_something(id, **context):
  ### do something###


prepare_parameters_operator = PythonOperator(python_callable=prepare_parameters,
                                             task_id='prepare_parameters',
                                             queue='default',
                                             dag=dag)  

operations = []
for id in ids: ### how to get the ids from Xcom? ###
    operations.append(PythonOperator(python_callable=do_something,
                                     task_id='do_something_{}'.format(id),
                                     queue='default',
                                     op_kwargs={'id': id},
                                     dag=dag))

for operation in operations:
    prepare_parameters_operator >> operation >> end_tasks


2
  • how/where are you defining dag_conf initially? Commented Jun 6, 2022 at 14:16
  • first line of prepare_parameters (edited now) Commented Jun 6, 2022 at 14:29

2 Answers 2

2

FWIW, in Airflow 2.3.0 you can use Dynamic Task Mapping if you are OK with not having explicit nodes for each id. The number of tasks under the "do_something" node can change at runtime based on the output from the "prepare_parameters" task. The number in the square brackets of "do_something" indicates the number of tasks that were mapped.

from pendulum import datetime

from airflow.decorators import dag, task


@task(queue="default")
def prepare_parameters(dag_run=None):
    return dag_run.conf.get("ids", [1, 2, 3])


@task(queue="default")
def do_something(id):
    print(id)


@dag(start_date=datetime(2022, 1, 1), schedule_interval=None)
def prepare_parameters_dag():
    _prepare_parameters_operator = prepare_parameters()
    _do_something = do_something.expand(id=_prepare_parameters_operator)


_ = prepare_parameters_dag()

enter image description here

Sign up to request clarification or add additional context in comments.

Comments

0

This is almost impossible. This answer describing the problem (https://stackoverflow.com/a/56556790/2956135).

So, I usually approach this parallel processing like this.

# Specify a static number of tasks
WORKERS = 3

def do_something(worker_id, **context):
    dag_conf = context['dag_run'].conf
    ids = dag_conf['ids'] if 'ids' in dag_conf else [1, 2, 3]

    # Find the ids that this worker needs to do something.
    # If worker_id = 1, and ids = [1, 2, 3, 4, 5] then sub_ids = [1, 4]
    sub_ids = [x for x in ids if x % WORKERS == worker_id]

    if len(sub_ids) < 1:
        print('No processes for this worker.')
        return

    # Do something with sub_ids
    for i in sub_ids:
        print('processing...  ', i)

Then define DAGs following.

If Airflow 2.0+

with DAG(...) as dag:
    start = DummyOperator(task_id='start')

    with TaskGroup(group_id='tasks') as task_group:
        for i in range(WORKERS):
            inner_one = PythonOperator(task_id=f'inner_{i}',
                                       python_callable=do_something,
                                       op_kwargs={'worker_id': i})

    end = DummyOperator(task_id='end')

    start >> task_group >> end

If Airflow < 2.0

with DAG(...) as dag:
    start = DummyOperator(task_id='start')

    tasks = []
    for i in range(WORKERS):
        tasks.append(PythonOperator(task_id=f'inner_{i}',
                                    python_callable=do_something,
                                    op_kwargs={'worker_id': i}))

    end = DummyOperator(task_id='end')

    start >> tasks >> end

Comments

Your Answer

By clicking “Post Your Answer”, you agree to our terms of service and acknowledge you have read our privacy policy.

Start asking to get answers

Find the answer to your question by asking.

Ask question

Explore related questions

See similar questions with these tags.