1

I have three tasks, 1. AddEMRStep 2. Sensor 3. SQLstep. I just want it to be created for two environments.

with dag:
    run_this_task = PythonOperator(
        task_id = "run_this",
        python_callable=push_to_xcom,
        provide_context=True,
        retries=10,
        retry_delay=timedelta(seconds=1)
    )

    run_this_task2 = PythonOperator(
        task_id = "run_this2",
        python_callable=run_this_func,
        provide_context=True
    )

    run_this_task >> run_this_task2

Now I need to create these dags for multiple environments

I am trying to do soemthing like this

envs = ["stg","prod"]

How can i use a for loop to make it like this

with dag:
    run_this_task_stg = PythonOperator(
        task_id = "run_this_task_stg",
        python_callable=push_to_xcom,
        provide_context=True,
        retries=10,
        retry_delay=timedelta(seconds=1)
    )

    run_this_task2_stg = PythonOperator(
        task_id = "run_this_task2_stg",
        python_callable=run_this_func,
        provide_context=True
    )

    run_this_task_prod = PythonOperator(
        task_id = "run_this_task_prod",
        python_callable=push_to_xcom,
        provide_context=True,
        retries=10,
        retry_delay=timedelta(seconds=1)
    )

    run_this_task2_prod = PythonOperator(
        task_id = "run_this_task2_prod",
        python_callable=run_this_func,
        provide_context=True
    )

    start >> run_this_task_stg >> run_this_task2_stg 
    start >> run_this_task_prod >> run_this_task2_prod
3
  • Do you mean creating 2 separate DAGs for each environment or 1 DAG but just creating the 2 sets of tasks in a for loop? Commented Feb 11, 2022 at 15:40
  • @JoshFell, yes one DAG and create two set of tasks, two for staging and two for production Commented Feb 11, 2022 at 15:51
  • Can I use for loop for this Commented Feb 11, 2022 at 16:03

1 Answer 1

2

Absolutely! Try something like this:

from datetime import datetime, timedelta

from airflow.models import DAG
from airflow.operators.dummy import DummyOperator
from airflow.operators.python import PythonOperator


def push_to_xcom():
    ...

def run_this_func():
    ...

dag = DAG(dag_id="loops", start_date=datetime(2022, 1, 1), schedule_interval=None)
envs = ["stg", "prod"]

with dag:
    start = DummyOperator(task_id="start")
    
    for env in envs:
        run_this_task = PythonOperator(
            task_id = f"run_this_task_{env}",
            python_callable=push_to_xcom,
            retries=10,
            retry_delay=timedelta(seconds=1)
        )

        run_this_task2 = PythonOperator(
            task_id = f"run_this_task2_{env}",
            python_callable=run_this_func,
        )

        start >> run_this_task >> run_this_task2

Graph View enter image description here

Or with the TaskFlow API:

from datetime import datetime, timedelta

from airflow.models import DAG
from airflow.decorators import task
from airflow.operators.dummy import DummyOperator


dag = DAG(dag_id="loops", start_date=datetime(2022, 1, 1), schedule_interval=None)
envs = ["stg", "prod"]

with dag:
    start = DummyOperator(task_id="start")

    for env in envs:

        @task(task_id=f"run_this_task_{env}", retries=10, retry_delay=timedelta(seconds=1))
        def push_to_xcom():
            ...

        @task(task_id=f"run_this_task2_{env}")
        def run_this_func():
            ...

        start >> push_to_xcom() >> run_this_func()
Sign up to request clarification or add additional context in comments.

Comments

Your Answer

By clicking “Post Your Answer”, you agree to our terms of service and acknowledge you have read our privacy policy.

Start asking to get answers

Find the answer to your question by asking.

Ask question

Explore related questions

See similar questions with these tags.