0

I am using Apache Airflow (2.2.3) taskflow API in conjunction with the TriggerDagRunOperator.

I am trying to dynamically pass a variable to the conf option of TriggerDagRunOperator using jinja. Please see simple example below.

@dag(dag_id='my_dag_trigger')

def first_taskflow():

    @task(multiple_outputs=True)
    def create_some_values():

        return {'v1': value1, 'v2', value2}

    @task
    def trigger_dag(**kwargs):

        TriggerDagRunOperator(
            task_id='my_second_dag',
            conf={
                'v1': "{{ task_instance.xcom_pull(task_ids='create_some_values', key='v1') }}"
            }).execute(kwargs)

    create_some_values() >> trigger_dag()

first_taskflow()

# ------- DAG TO BE TRIGGERED -------

@dag(dag_id='my_second_dag')

def secondary_taskflow():

    @task()
    def secondary_task(**context):
        print(context['dag_run'].conf.get('v1'))

    secondary_task()

secondary_taskflow()

When the value is passed into the triggered DAG the jinja {{ task_instance.xcom_pull(task_ids='create_some_values', key='v1') }} statement is not returning the parameter value for v1 but returning "{{ task_instance.xcom_pull(task_ids='create_some_values', key='v1') }}" as the value.

Is there a way to pass values dynamically into the conf?

Any help greatly appreciated.

0

Your Answer

By clicking “Post Your Answer”, you agree to our terms of service and acknowledge you have read our privacy policy.

Start asking to get answers

Find the answer to your question by asking.

Ask question

Explore related questions

See similar questions with these tags.