10

I want to generate multiple airflow dags using one script. The dag names should be "test_parameter". Below is my script:

from datetime import datetime

# Importing Airflow modules
from airflow.models import DAG
from airflow.operators import DummyOperator

# Specifying the default arguments for the DAG
default_args = {
    'owner': 'Test',
    'start_date': datetime.now()
    }

parameter_list = ["abc", "pqr", "xyz"]

for parameter in parameter_list:
    dag = DAG("test_"+parameter,
              default_args=default_args,
              schedule_interval=None)
    dag.doc_md = "This is a test dag"

    # Creating Start Dummy Operator
    start = DummyOperator(
        task_id="start",
        dag=dag)

    # Creating End Dummy Operator
    end = DummyOperator(
        task_id="end",
        dag=dag)

    # Design workflow of tasks in the dag
    end.set_upstream(start)

So in this case, it should create 3 dags: "test_abc", "test_pqr" and "test_xyz".

But on running the script, it creates only one dag "test_xyz". Any insights on how to solve this issue. Thanks in advance :)

3 Answers 3

12

Yes, it's possible, you can save your config for each DAG namely inside a storage. For example you can save your configuration within a persistent storage (DB) and then fetch the configuration and save the result inside a cache. This was done mainly because we want to prevent the dag script fetching the configuration from DB each time the DAG script refreshed. So instead, we use a cache and save its expire time. You can refer to this article on how to create a dynamic DAG

for i in range(10):
  dag_id = 'foo_{}'.format(i)
  globals()[dag_id] = DAG(dag_id)

In turn you also want to create a dynamic sub-DAG and dynamic tasks as well. Hope it helps :-)

Sign up to request clarification or add additional context in comments.

Comments

2

I guess the problem is that the dag objects 'start' and 'end' get overwrote by the forloop hence only the last value is retained.

It is weird that although you cant create dag dynamically, but you can create tasks dynamically through a loop. maybe that helps.

for i in range(3):
    t1 = BashOperator(
    task_id='Success_test'+str(i),
    bash_command='cd home',
    dag=dag)

    slack_notification.set_upstream(t1)

Comments

1

You can register the dynamically created dags in the global namespace.

For example:

globals()[parameter] = dag

1 Comment

it should be globals

Your Answer

By clicking “Post Your Answer”, you agree to our terms of service and acknowledge you have read our privacy policy.

Start asking to get answers

Find the answer to your question by asking.

Ask question

Explore related questions

See similar questions with these tags.