18

I'm attempting to generate a set of dynamic tasks from a XCOM variable. In the XCOM I'm storing a list and I want to use each element of the list to dynamically create a downstream task.

My use case is that I have an upstream operator that checks a sftp server for files and returns a list of file names matching specific criteria. I want to create dynamic downstream tasks for each of the file names returned.

I've simplified it to the below, and while it works I feel like its not an idiomatic airflow solution. In my use case, I would write a python function that's called from a python operator that pulls the value from xcom and returns it, instead of using the pusher function.

I understand that while I can create a custom operator that combines both I don't think creating a throwaway operator is good practice and I'm hoping there's another solution.

from airflow.operators.python_operator import PythonOperator
from airflow.operators.bash_operator import BashOperator
from airflow import DAG
from datetime import datetime, timedelta

default_args = {
    "owner": "test",
    "depends_on_past": False,
    "start_date": datetime(2018, 10, 27),
    "email": ["[email protected]"],
    "email_on_failure": False,
    "email_on_retry": False,
    "email_on_success": False,
    "retries": 0,
    "provide_context": True
}

dag = DAG("test",  default_args=default_args, schedule_interval="@daily", catchup=False)


def pusher(**context):
    return ['a', 'b', 'c', 'd', 'e']

pusher_task = PythonOperator(
    task_id='pusher_task',
    dag=dag,
    python_callable=pusher  
)

def bash_wrapper(task, **context):
    return BashOperator(
        task_id='dynamic'+task,
        dag=dag,
        bash_command='date'
    )

end = BashOperator(task_id='end', dag=dag, bash_command='echo task has ended')


pusher_task >> [bash_wrapper(task) for task in pusher()] >> end

2 Answers 2

12
+50

I wouldn't do what you're trying to achieve mainly because:

  1. XCOM value is a state generated in runtime
  2. DAG structure is something determined in parse time

Even if you use something like the following to get an access to XCOM values generated by some upstream task:

from airflow.models import TaskInstance
from airflow.utils.db import provide_session

dag = DAG(...)

@provide_session
def get_files_list(session):
    execution_date = dag.previous_schedule(datetime.now())

    // Find previous task instance:
    ti = session.query(TaskInstance).filter(
        TaskInstance.dag_id == dag.dag_id,
        TaskInstance.execution_date == execution_date,
        TaskInstance.task_id == upstream_task_id).first()
    if ti:
        files_list = ti.xcom_pull()
        if files_list:
            return files_list
    // Return default state:
    return {...}


files_list = get_files_list()
// Generate tasks based on upstream task state:
task = PythonOperator(
    ...
    xcom_push=True,
    dag=dag)

But this would behave very strangely, because DAG parsing and task execution are not synchronised in a way you wish.

If the main reason you want to do this is parallelising files processing, I'd have some static number of processing tasks (determined by the required parallelism) that read files list from upstream task's XCOM value and operate on a relevant portion of that list.

Another option is parallelising files processing using some framework for distributed computations like Apache Spark.

Sign up to request clarification or add additional context in comments.

5 Comments

The major reasons to do this are 1. Parallelism, 2. I have no control over how many files / or data in list I receive from upstream task but I want to ensure all files pass through the downstream tasks
With the first approach you mentioned, lets assume there are 5 files and two processing tasks. After the processing tasks handles file1 and file2 from the list, how I can trigger the same task again for file3 and file4? And in the end how can I trigger 1 processing task alone for file5? Any example in this direction is helpful
@nightgaunt each processing task takes a portion of files array using: files[int(task_idx * len(files) / parallelism):int((task_idx + 1) * len(files) / parallelism)], this way first task gets files 1-2, while the second one takes files 3 through 5.
But a single task need to handle multiple files by iterating within a single operator. Correct? This requires a custom operator which performs the same operation twice/thrice. Don't take it the bad way. Your solution makes sense and the closest I have to an answer. But if there's a way I can have a task operate on only one file, I am willing to go with that option.
A solution to use subdags and generate tasks dynamically is brewing in my head for a while. But your explanation about xcom being run time while structure is parse time is making me hesitant.
-3

The simplest way I can think of is to use a branch operator. https://github.com/apache/airflow/blob/master/airflow/example_dags/example_branch_operator.py

2 Comments

Can you elaborate? How does BranchOperator help to access the xcom variables and set dynamic downstream tasks on them?
Your example shows how to programmatically create branches - the question was, how to dynamically create tasks, depending on the result fetched by a previous task in runtime

Your Answer

By clicking “Post Your Answer”, you agree to our terms of service and acknowledge you have read our privacy policy.

Start asking to get answers

Find the answer to your question by asking.

Ask question

Explore related questions

See similar questions with these tags.