0

I am creating a dag that should do the following:

  • fetch event ids
  • for each event id, fetch event details ( DockerOperator )

The code below is my attempt to do what I want:

from datetime import datetime

from airflow.operators.python import PythonOperator
from airflow.providers.docker.operators.docker import DockerOperator



With Dag(
    start_date=datetime(2024, 11, 1),
    schedule="@daily",
):
    task_fetch_ids = PythonOperator(
        task_id="fetch_detail",
        ...)


    task_fetch_detail = DockerOperator(
        task_id="fetch_detail",
        image="image:v1",
        ).expand(
            command=[f"fetch-event --event-id  {event_id}" for event_id in "{{ ti.xcom_pull(task_ids='task_fetch_ids', key='return_value') }}"]
        )


    task_fetch_ids >> task_fetch_detail


The above clearly doesn't work because I am looping through a string. What is the correct syntax ?

1 Answer 1

1

you must adapt the xcom return to the args of the dynamic task mapping operator


from airflow.operators.python import PythonOperator
from airflow.utils.dates import days_ago
from airflow import DAG
from airflow.providers.docker.operators.docker import DockerOperator

dag = DAG(
    dag_id="docker_dag",
    schedule_interval=None,
    start_date=days_ago(1),
)
with dag:
    def fn_get_work():
        return ["a", "b", "c"]


    get_work_task = PythonOperator(task_id="get_work",
                                   python_callable=fn_get_work
                                   )


    def fn_build(work):
        rst = []
        for i in work:
            rst.append(f"fetch-event --event-id {i}")
        return rst


    build_work_task = PythonOperator(task_id="build_work",
                                     python_callable=fn_build,
                                     op_kwargs={"work": get_work_task.output})

    run_work_task = DockerOperator.partial(
        task_id="run_work",
        image="alpine:3.16.2",
    ).expand(command=build_work_task.output)

    get_work_task >> build_work_task >> run_work_task

Sign up to request clarification or add additional context in comments.

1 Comment

Works like a charm, thanks

Your Answer

By clicking “Post Your Answer”, you agree to our terms of service and acknowledge you have read our privacy policy.

Start asking to get answers

Find the answer to your question by asking.

Ask question

Explore related questions

See similar questions with these tags.