Skip to main content
Filter by
Sorted by
Tagged with
1 vote
0 answers
40 views

Airflow ExternalTaskSensor waits forever after TriggerDagRunOperator I just started learning Airflow and I have a problem with ExternalTaskSensor. I have 2 DAGs: A trigger_dag that waits for a file, ...
Vladan Markov's user avatar
0 votes
0 answers
110 views

I am building an Airflow DAG with a KubernetesPodOperator that I would like to parametrise heavily. I would like to to parametrise cmds, image, and volume_mounts mount path. The first two work as I ...
Maciej Ziaja's user avatar
0 votes
0 answers
137 views

I have these DAG Import Errors on my Airflow UI. I am working on a test server so I have free hand. I have deleted all records from task_instance; dag_run; import_error; serialized_dag; and dag; ...
Rawnauk Chatterjee's user avatar
0 votes
0 answers
25 views

I am trying to create an airflow dag that will have multiple schedules. For the first schedule in a day (6:00 and manual triggers) I need to run Task1 which is my custom written operator with arg=...
JstFlip's user avatar
  • 23
2 votes
1 answer
271 views

Below is a minimal implementation of a branch operator using the taskflow api. The dag will execute either odd_task or even_task based on the string given by branch_on_condition. odd_task or even_task ...
GazYah's user avatar
  • 38
0 votes
1 answer
626 views

I'm trying to code a pipeline that uploads attachments to salesforce. I have a Taskflow function that generates the mappings to be used by the SalesforceBulkOperator. At first I thought that simply ...
displayname's user avatar
0 votes
0 answers
65 views

In my use case I want to trigger another DAG from one DAG. My first DAG is as below :- with DAG( dag_id = "data_insertion", start_date = datetime(2024, 10, 8), schedule_interval = SCHEDULE, ...
Aviator's user avatar
  • 758
0 votes
0 answers
58 views

I am using Apache Airflow (2.2.3) taskflow API in conjunction with the TriggerDagRunOperator. I am trying to dynamically pass a variable to the conf option of TriggerDagRunOperator using jinja. Please ...
SiDoesCoding's user avatar
0 votes
0 answers
95 views

I'm in airflow 2.10.2. I'm using a DockerOperator to generate a list of configuration (a list of dict). My goal is to pass those configurations to a TriggerDagOperator task, expanding the list to ...
Fernando Petrulio's user avatar
0 votes
1 answer
94 views

I have DAG1 and DAG2. There are tasks in DAG1: t11 >> t12. Tasks in DAG2: t21 >> t22. The settings are such that only one task can be executed at a time. I want tasks in each DAG to run ...
norden87's user avatar
0 votes
1 answer
133 views

I am new to Airflow DAGs. I have a python script to fetch API calls. I have added a raise Exception in my code, if all API calls failed i.e. status_code!=200, then it should raise exception. However, ...
S_2103's user avatar
  • 1
0 votes
0 answers
51 views

I have an Apache Airflow server running version 1.10.10. I'm encountering an issue where, after the DAGs have completed, attempting to view the graph of a previous Run_ID does not display the correct ...
DUGGIRALA DILEEP's user avatar
0 votes
0 answers
159 views

I use Airflow 2.10.3 and I'd like to retrieve informations about when a task starts, ends, its status (e.g. success, fail) and similar attributes. I found a PostgreSQL database with all the list of ...
SamCle88's user avatar
  • 275
1 vote
1 answer
502 views

I currently have 2 DAGs that contain the same parameter that can be set called run_date. I want the upstream DAG to trigger the downstream DAG using inlet and outlet Datasets which seems intuitive. ...
David Whiting's user avatar
0 votes
1 answer
62 views

How can we pass data by using a custom operator task ? Here is my code. The sql file does not take the value of the key table_name. The rendered template suposed to be SELECT COUNT(*) FROM product; ...
QDex's user avatar
  • 163
0 votes
0 answers
93 views

I'm working with Apache Airflow 2.9.1 and trying to implement dynamic task group expansion based on the output from a task in a different task group. Here's a sample DAG implementation. In this, I ...
Hemanth's user avatar
  • 171
0 votes
1 answer
85 views

I'm trying the new TaskFlow API I have 4 @task then I would love to have at the end t1 >> t2 >> t3 >>> t4. I endup doing : t1 = task1() t2 = task2(t1) t3 = task3(t2) t4 = task4(t2)...
Ragnar's user avatar
  • 2,704
0 votes
1 answer
76 views

I tried creating a decorator that would automatically assign a pod name based on the wrapped function, something like: def default_kubernetes_task( **task_kwargs: Any, ) -> Callable[[Callable[.....
wasabigeek's user avatar
  • 3,252
1 vote
1 answer
137 views

I am trying to pass the ID of a playlist using an environment variable. It is initially set through the execution of this Python script from the user. def get_input(prompt): return input(prompt) ...
Holly2207's user avatar
0 votes
1 answer
197 views

I am creating a dag that should do the following: fetch event ids for each event id, fetch event details ( DockerOperator ) The code below is my attempt to do what I want: from datetime import ...
lalaland's user avatar
  • 451
0 votes
1 answer
91 views

Hello I have a use case where I need to run a sequence of transformations on a collection of data. I grouped the sequence of transformation in a task group in order to dynamically map the tg to the ...
Oussama Darrazi's user avatar
1 vote
3 answers
398 views

Let's say that I have the following (simplified) dag: I have a task that returns a series of query parameters values, and I want to spawn a dynamic task instance of httpoperator to do a query like ...
Vito De Tullio's user avatar
0 votes
1 answer
378 views

I'm using python to test the Airflow DAG. I have environmental variable: AIRFLOW_CONN_DB="postgresql://postgres:[email protected]:6099/db" and I use the following command to get the hook to ...
Марина Лисниченко's user avatar
1 vote
0 answers
308 views

I'm totally new to apache airflow and dbt. I'm running airflow on docker and when I tried to run the dbt demo workflow, I get this error. I've managed to run the dbt model on my local. Any idea what ...
sup3rgiant's user avatar
0 votes
1 answer
46 views

Is there any way to run a particular task of a dag from another dag? I need to clear TaskA from dag1 using dag2. I know there is a cli command that clears and reruns the tasks. But I need it ...
SRV's user avatar
  • 13
0 votes
1 answer
278 views

I have a DAG that is running another DAGs with TriggerDagRunOperator trigger_1 = TriggerDagRunOperator( task_id='trigger_dag1', trigger_dag_id='job1', wait_for_completion=True, ...
Arti's user avatar
  • 7,842
1 vote
2 answers
653 views

I want to read and make a plot with airflow. The first task is successfully run, but the second (creating the plot) failed. The error says a task exited with the return code Negsignal.SIGABRT and also ...
statsbeginner's user avatar
2 votes
0 answers
32 views

In airflow is it possible to control the concurrency of the DAG with an UI param? As far as I know DAG run parameters are only know as task runtime, so it does not look like a good approach. ...
Guillem Ramirez Miranda's user avatar
0 votes
1 answer
308 views

I would like to improve this DAG if possible. from ... import.... with DAG( ... ) as dag: @task_group(group_id=f"group", dag=dag) def group(dag_instance, dataset, table): ...
toyo123's user avatar
0 votes
0 answers
307 views

I’ve recently started working with Airflow and I’m using Airflow 2.9.1 with the Taskflow API. I’ve created a task group with dynamic task mapping and I’m using task decorators exclusively (no ...
Hemanth's user avatar
  • 171
1 vote
1 answer
1k views

Take this simple dag, in which one task takes the output of another: import datetime from airflow.decorators import dag, task @task def task_that_returns_a_string() -> str: return "a ...
Izaak Cornelis's user avatar
0 votes
1 answer
426 views

I'm experiencing an issue with Apache Airflow where values pushed to XCom in one task are not retrievable in a subsequent task within the same DAG. Here is a minimal example of my code: from airflow ...
daniel guo's user avatar
0 votes
1 answer
105 views

My current DAG looks like: def download_stage(dag_def: DailyDownloadDefinition, **additional_kwargs): ... retries = 5 download = ShortCircuitOperator( task_id=task_id, ...
user21641220's user avatar
0 votes
1 answer
532 views

I am working with Airflow and have created a DAG that uses dynamic task mapping inside a task group. I have two questions: How to remove the line from get_files to process_file_step2? I want ...
Hemanth's user avatar
  • 171
0 votes
1 answer
420 views

I have an airflow job which launches a Spark job in one task and the next task extracts the application logs to find the Spark job application ID. I use Xcom push in the spark-submit task and ...
Ajith Kannan's user avatar
0 votes
1 answer
89 views

I was told to disable the dbt DAG in Airflow, which I did, but the dbt DAG in my case is called by a parent "main" DAG, which calls an "extract" DAG before calling the dbt one. I ...
Jose Pla's user avatar
  • 120
0 votes
3 answers
512 views

In my PostgresOperator, I would like to pass SQL file path as variable to sql parameter. I am reading this file path from a configuration file: sql_execution = PostgresOperator( task_id='...
Hazhir's user avatar
  • 61
-1 votes
1 answer
76 views

This is my dag: dag_pr_api = DAG( dag_id='API-PERSONAL-COMPUTER', default_args=args, schedule_interval='30 07 * * *', start_date=pendulum.yesterday('Europe/Berlin'), catchup=False, ...
maneesh arava's user avatar
3 votes
0 answers
181 views

I have a DAG in apache airflow like so @dag( ) def taskflow(): .... @task() def generate_args(): return arglist run_pod = pod_operator( name="name", ...
srinivas kumar's user avatar
0 votes
1 answer
197 views

I have a number of individual DAGs that are not connected to each other. The one thing that 'connects' them are that they are tagged with US_TAG. I am trying to run a new task after all of these DAGs (...
user21641220's user avatar
0 votes
2 answers
520 views

I have 2 dags that look like this (note that the 2nd dag automatically runs after the first - they are linked by an Airflow Dataset). Is there a way for the 2nd dag to retrieve the value returned by ...
karlk's user avatar
  • 21
0 votes
1 answer
353 views

I have following use case: I am trying to create an Airflow DAG which will be used to automate historical data load. I am passing 5 arguments to this DAG - jar_path, main_class_name, start_param, ...
Jaipreet's user avatar
0 votes
1 answer
846 views

I'm trying to get my head around the TaskFlow API & XCom in airflow and am getting stuck, hoping someone here can help. I'm using EmrServerlessCreateApplicationOperator and I want to pass a value ...
jamiet's user avatar
  • 12.7k
1 vote
0 answers
145 views

UPDATE. I realise I haven't explained the problem concisely enough so I've posted a revised question at How do I pass xcom to traditional operator from task created using @task decorator? I am using ...
jamiet's user avatar
  • 12.7k
0 votes
1 answer
78 views

I have this function that extracts data from an API, but the API returns about 505,000 rows of data, this takes longer time to run . Here is my function, is there a better way to optimize it, it slows ...
Anamsken's user avatar
1 vote
1 answer
269 views

Have been doing an upgrade from Airflow 2.2.2 to 2.8.1. All of my DAGs parse after doing relatively minor modifications, except for one. Using the TaskFlow API, I have a task with this structure: @...
Kevin Languasco's user avatar
0 votes
3 answers
386 views

I have two tasks: task_a and task_b. There are DAG-parameters run_task_a and run_task_b that determine whether each task should be run. There is further parameter that is an input for task_a. Here's ...
dwolfeu's user avatar
  • 1,263
0 votes
1 answer
758 views

I would like to set up a DAG where all tasks in a single TaskGroup are done before running to the next one. Meaning that in the example (cf screenshot), the Workflow_FRA has to be done with the tasks ...
Antoine F's user avatar
  • 191
-1 votes
1 answer
856 views

The problem is that I have a loop with the generation of several tasks (let's say 2: "task_a" and "task_b"), "task_a" returns some value that I get from XCom, I pass this ...
Alex's user avatar
  • 1
1 vote
1 answer
297 views

I'm having problems with some dags that are running duplicated. Example: I have a dag that runs every hour, generally it runs quickly for a maximum of 15 minutes, but it can happen that it runs for ...
owfgktalef's user avatar