137 questions
1
vote
0
answers
40
views
Airflow ExternalTaskSensor waits forever even though the task has finished
Airflow ExternalTaskSensor waits forever after TriggerDagRunOperator
I just started learning Airflow and I have a problem with ExternalTaskSensor.
I have 2 DAGs:
A trigger_dag that waits for a file, ...
0
votes
0
answers
110
views
How to use Airflow params to template operator fields that receive complex objects
I am building an Airflow DAG with a KubernetesPodOperator that I would like to parametrise heavily.
I would like to to parametrise cmds, image, and volume_mounts mount path. The first two work as I ...
0
votes
0
answers
137
views
Airflow DAG Import errors even after clearing import_error table
I have these DAG Import Errors on my Airflow UI. I am working on a test server so I have free hand. I have deleted all records from task_instance; dag_run; import_error; serialized_dag; and dag; ...
0
votes
0
answers
25
views
Airflow conditional runs based on schedules
I am trying to create an airflow dag that will have multiple schedules.
For the first schedule in a day (6:00 and manual triggers) I need to run Task1 which is my custom written operator with arg=...
2
votes
1
answer
271
views
In Airflow 2.10 can I use dynamic task mapping with BranchPythonOperator?
Below is a minimal implementation of a branch operator using the taskflow api.
The dag will execute either odd_task or even_task based on the string given by branch_on_condition. odd_task or even_task ...
0
votes
1
answer
626
views
PlainXComArg not resolving before being passed to Operator
I'm trying to code a pipeline that uploads attachments to salesforce. I have a Taskflow function that generates the mappings to be used by the SalesforceBulkOperator. At first I thought that simply ...
0
votes
0
answers
65
views
NoneType' object is not callable [Airflow Dag Error while creating dependencies]
In my use case I want to trigger another DAG from one DAG. My first DAG is as below :-
with DAG(
dag_id = "data_insertion",
start_date = datetime(2024, 10, 8),
schedule_interval = SCHEDULE,
...
0
votes
0
answers
58
views
Apache Airflow Taskflow API: Pass XCOM value using JINJA2 into TriggerDagRunOperator conf
I am using Apache Airflow (2.2.3) taskflow API in conjunction with the TriggerDagRunOperator.
I am trying to dynamically pass a variable to the conf option of TriggerDagRunOperator using jinja. Please ...
0
votes
0
answers
95
views
Airflow DockerOperator, Dynamic Tasks and XCom
I'm in airflow 2.10.2. I'm using a DockerOperator to generate a list of configuration (a list of dict). My goal is to pass those configurations to a TriggerDagOperator task, expanding the list to ...
0
votes
1
answer
94
views
Airflow: do not start tasks in DAG1, while DAG2 unfinished
I have DAG1 and DAG2.
There are tasks in DAG1: t11 >> t12.
Tasks in DAG2: t21 >> t22.
The settings are such that only one task can be executed at a time.
I want tasks in each DAG to run ...
0
votes
1
answer
133
views
How to make a DAG fail when we are raising exception in our python code?
I am new to Airflow DAGs. I have a python script to fetch API calls. I have added a raise Exception in my code, if all API calls failed i.e. status_code!=200, then it should raise exception. However, ...
0
votes
0
answers
51
views
Incorrect/empty dag Run_ID is opening when i click on previous dag Run_id
I have an Apache Airflow server running version 1.10.10. I'm encountering an issue where, after the DAGs have completed, attempting to view the graph of a previous Run_ID does not display the correct ...
0
votes
0
answers
159
views
Airflow 2.10.3 how to extract information about task instances?
I use Airflow 2.10.3 and I'd like to retrieve informations about when a task starts, ends, its status (e.g. success, fail) and similar attributes.
I found a PostgreSQL database with all the list of ...
1
vote
1
answer
502
views
Using data aware scheduling to trigger downstream dags while using metadata contained in extra parameter as parameters for downstream dag
I currently have 2 DAGs that contain the same parameter that can be set called run_date. I want the upstream DAG to trigger the downstream DAG using inlet and outlet Datasets which seems intuitive. ...
0
votes
1
answer
62
views
Pass data to jinja template in .sql file from a CustomOperator(BaseOperator)
How can we pass data by using a custom operator task ?
Here is my code. The sql file does not take the value of the key table_name. The rendered template suposed to be
SELECT COUNT(*) FROM product;
...
0
votes
0
answers
93
views
How to Dynamically Expand Task Groups Based on Output of Upstream Taskfrom different Task group in Airflow 2.9.1?
I'm working with Apache Airflow 2.9.1 and trying to implement dynamic task group expansion based on the output from a task in a different task group.
Here's a sample DAG implementation. In this, I ...
0
votes
1
answer
85
views
Airflow @task decorator not having the correct order
I'm trying the new TaskFlow API
I have 4 @task then I would love to have at the end t1 >> t2 >> t3 >>> t4.
I endup doing :
t1 = task1()
t2 = task2(t1)
t3 = task3(t2)
t4 = task4(t2)...
0
votes
1
answer
76
views
Is it possible to wrap @task.kubernetes so it assigns a pod name based on the decorated function name?
I tried creating a decorator that would automatically assign a pod name based on the wrapped function, something like:
def default_kubernetes_task(
**task_kwargs: Any,
) -> Callable[[Callable[.....
1
vote
1
answer
137
views
Airflow environment variable is empty
I am trying to pass the ID of a playlist using an environment variable. It is initially set through the execution of this Python script from the user.
def get_input(prompt):
return input(prompt)
...
0
votes
1
answer
197
views
[Airflow]: Dynamic Task Mapping on DockerOperator using Xcoms
I am creating a dag that should do the following:
fetch event ids
for each event id, fetch event details ( DockerOperator )
The code below is my attempt to do what I want:
from datetime import ...
0
votes
1
answer
91
views
Airflow dynamicaly mapped tasks don't run in sequence
Hello I have a use case where I need to run a sequence of transformations on a collection of data.
I grouped the sequence of transformation in a task group in order to dynamically map the tg to the ...
1
vote
3
answers
398
views
Dynamic Task Mapping for http operator and query string
Let's say that I have the following (simplified) dag: I have a task that returns a series of query parameters values, and I want to spawn a dynamic task instance of httpoperator to do a query like ...
0
votes
1
answer
378
views
Airflow PostgresHook retrieval in dag
I'm using python to test the Airflow DAG. I have environmental variable:
AIRFLOW_CONN_DB="postgresql://postgres:[email protected]:6099/db"
and I use the following command to get the hook to ...
1
vote
0
answers
308
views
Airflow dbt workflow run failed in docker
I'm totally new to apache airflow and dbt. I'm running airflow on docker and when I tried to run the dbt demo workflow, I get this error. I've managed to run the dbt model on my local. Any idea what ...
0
votes
1
answer
46
views
Run tasks from other dag from a parent dag
Is there any way to run a particular task of a dag from another dag? I need to clear TaskA from dag1 using dag2.
I know there is a cli command that clears and reruns the tasks. But I need it ...
0
votes
1
answer
278
views
Airflow TriggerDagRunOperator randomly failing with parallel run
I have a DAG that is running another DAGs with TriggerDagRunOperator
trigger_1 = TriggerDagRunOperator(
task_id='trigger_dag1',
trigger_dag_id='job1',
wait_for_completion=True,
...
1
vote
2
answers
653
views
Apache Airflow Zombie Job
I want to read and make a plot with airflow. The first task is successfully run, but the second (creating the plot) failed. The error says a task exited with the return code Negsignal.SIGABRT and also ...
2
votes
0
answers
32
views
Is there a way to control maximum dag concurrency from UI param?
In airflow is it possible to control the concurrency of the DAG with an UI param?
As far as I know DAG run parameters are only know as task runtime, so it does not look like a good approach.
...
0
votes
1
answer
308
views
how to create multiple task group in parallel using taskgroup decorator in Airflow
I would like to improve this DAG if possible.
from ... import....
with DAG(
...
) as dag:
@task_group(group_id=f"group", dag=dag)
def group(dag_instance, dataset, table):
...
0
votes
0
answers
307
views
How to Handle Skipping Subsequent Tasks for Specific Indices in Dynamic Task Mapping in Airflow Task Groups
I’ve recently started working with Airflow and I’m using Airflow 2.9.1 with the Taskflow API. I’ve created a task group with dynamic task mapping and I’m using task decorators exclusively (no ...
1
vote
1
answer
1k
views
Airflow decorated task type hinting
Take this simple dag, in which one task takes the output of another:
import datetime
from airflow.decorators import dag, task
@task
def task_that_returns_a_string() -> str:
return "a ...
0
votes
1
answer
426
views
Airflow XCom not retrieving values between tasks in DAG
I'm experiencing an issue with Apache Airflow where values pushed to XCom in one task are not retrievable in a subsequent task within the same DAG. Here is a minimal example of my code:
from airflow ...
0
votes
1
answer
105
views
How to set up airflow alerting so it alerts only on last retry attempt?
My current DAG looks like:
def download_stage(dag_def: DailyDownloadDefinition, **additional_kwargs):
...
retries = 5
download = ShortCircuitOperator(
task_id=task_id,
...
0
votes
1
answer
532
views
Airflow - Dynamic mapped Task Group - Removing mapped task dependencies for all the sub task, and access mapped_input in task group directly
I am working with Airflow and have created a DAG that uses dynamic task mapping inside a task group.
I have two questions:
How to remove the line from get_files to process_file_step2?
I want ...
0
votes
1
answer
420
views
Airflow XCom Push from SparkSubmitOperator not working
I have an airflow job which launches a Spark job in one task and the next task extracts the application logs to find the Spark job application ID. I use Xcom push in the spark-submit task and ...
0
votes
1
answer
89
views
In Airflow how to avoid a DAG to run a child DAG which is disabled in the Airflow UI?
I was told to disable the dbt DAG in Airflow, which I did, but the dbt DAG in my case is called by a parent "main" DAG, which calls an "extract" DAG before calling the dbt one.
I ...
0
votes
3
answers
512
views
Airflow PostgresOperator pass variable to sql parameter
In my PostgresOperator, I would like to pass SQL file path as variable to sql parameter. I am reading this file path from a configuration file:
sql_execution = PostgresOperator(
task_id='...
-1
votes
1
answer
76
views
the dag is getting triggered immediately when toggle is ON
This is my dag:
dag_pr_api = DAG(
dag_id='API-PERSONAL-COMPUTER',
default_args=args,
schedule_interval='30 07 * * *',
start_date=pendulum.yesterday('Europe/Berlin'),
catchup=False,
...
3
votes
0
answers
181
views
How do we unit test a nested `@task` decorated method in apache airflow?
I have a DAG in apache airflow like so
@dag(
)
def taskflow():
....
@task()
def generate_args():
return arglist
run_pod = pod_operator(
name="name",
...
0
votes
1
answer
197
views
Can I use airflow tags to determine if all DAGs with a certain tag has finished running?
I have a number of individual DAGs that are not connected to each other. The one thing that 'connects' them are that they are tagged with US_TAG.
I am trying to run a new task after all of these DAGs (...
0
votes
2
answers
520
views
In Apache Airflow, how can I exchange data between tasks in different dags, using the Airflow TaskFlow API?
I have 2 dags that look like this (note that the 2nd dag automatically runs after the first - they are linked by an Airflow Dataset). Is there a way for the 2nd dag to retrieve the value returned by ...
0
votes
1
answer
353
views
How can I create DatabricksSubmitRunOperator tasks dynamically in Airflow?
I have following use case:
I am trying to create an Airflow DAG which will be used to automate historical data load. I am passing 5 arguments to this DAG - jar_path, main_class_name, start_param, ...
0
votes
1
answer
846
views
How do I pass xcom to traditional operator from task created using @task decorator?
I'm trying to get my head around the TaskFlow API & XCom in airflow and am getting stuck, hoping someone here can help. I'm using EmrServerlessCreateApplicationOperator and I want to pass a value ...
1
vote
0
answers
145
views
How do I pass xcom to 3rd party operator when using TaskFlow?
UPDATE. I realise I haven't explained the problem concisely enough so I've posted a revised question at How do I pass xcom to traditional operator from task created using @task decorator?
I am using ...
0
votes
1
answer
78
views
Fetching data from API takes longer time to run because the data is large
I have this function that extracts data from an API, but the API returns about 505,000 rows of data, this takes longer time to run .
Here is my function, is there a better way to optimize it, it slows ...
1
vote
1
answer
269
views
"ValueError: non-default argument follows default argument" when upgrading from Airflow 2.2.2 to 2.8.1
Have been doing an upgrade from Airflow 2.2.2 to 2.8.1. All of my DAGs parse after doing relatively minor modifications, except for one.
Using the TaskFlow API, I have a task with this structure:
@...
0
votes
3
answers
386
views
Airflow branching: A task that only sometimes depends on an upstream task
I have two tasks: task_a and task_b. There are DAG-parameters run_task_a and run_task_b that determine whether each task should be run. There is further parameter that is an input for task_a. Here's ...
0
votes
1
answer
758
views
Airflow : Complete all tasks in a TaskGroup before running to the next one and avoid dependancies between TaskGroup
I would like to set up a DAG where all tasks in a single TaskGroup are done before running to the next one.
Meaning that in the example (cf screenshot), the Workflow_FRA has to be done with the tasks ...
-1
votes
1
answer
856
views
Override TaskGroup in Airflow
The problem is that I have a loop with the generation of several tasks (let's say 2: "task_a" and "task_b"), "task_a" returns some value that I get from XCom, I pass this ...
1
vote
1
answer
297
views
Airflow DAG starts duplicate when execution is slow
I'm having problems with some dags that are running duplicated.
Example: I have a dag that runs every hour, generally it runs quickly for a maximum of 15 minutes, but it can happen that it runs for ...