0

Want to access the config passed in the data in the API call

curl --location 'http://localhost:8080/api/v1/dags/postgres_operator_dag/dagRuns' \
--header 'Content-Type: application/json' \
--data '{
    "conf": {
        "seller_id": "test_seller"
    }
}'

Want to access seller_id in the sqlquery in PostgresOperator

with DAG(dag_id="postgres_operator_dag", start_date=datetime(2020, 2, 2), schedule_interval=None) as dag:
# seller_id = context['dag_run'].conf['seller_id']. Cant access context here

connection_source_check = PostgresOperator(
    task_id="fetch_connection_details",
    postgres_conn_id="postgres_connection",
    sql=f"""select c.name as connection_name, c.id as connection_id, a.name as source_name, a.id, a.actor_type as 
    source_id from "connection" c left join actor a on  a.id=c.source_id where a.name=%(seller_id)s;""",
    parameters={'seller_id': '{{ dag_run.conf["seller_id"] }}'},
    dag=dag
)

The above task returns no data. But when I hardcode parameters as

parameters={'seller_id': 'test_seller'}

It works. Also Tried having sql file with params but with no effect.

This approach too doesnt work. It considers {{ dag_run.conf["seller_id"] }} as a string rather than evaluating the expression to test_seller and doesn't make the right query

with DAG(dag_id="postgres_operator_dag", start_date=datetime(2020, 2, 2), schedule_interval=None) as dag:

connection_source_check = PostgresOperator(
    task_id="fetch_connection_details",
    postgres_conn_id="postgres_hogwarts_connection",
    sql=f"""select c.name as connection_name, c.id as connection_id, a.name as source_name, a.id, a.actor_type as
        source_id from "connection" c left join actor a on  a.id=c.source_id where a.name= '{{ dag_run.conf["seller_id"] }}' ;""",
    dag=dag
)

This approach gives a syntax error -

psycopg2.errors.SyntaxError: syntax error at or near "{" LINE 2: ...ft join actor a on a.id=c.source_id where a.name={ dag_run....

with DAG(dag_id="postgres_operator_dag", start_date=datetime(2020, 2, 2), schedule_interval=None) as dag:

connection_source_check = PostgresOperator(
    task_id="fetch_connection_details",
    postgres_conn_id="postgres_hogwarts_connection",
    sql=f"""select c.name as connection_name, c.id as connection_id, a.name as source_name, a.id, a.actor_type as
        source_id from "connection" c left join actor a on  a.id=c.source_id where a.name={{ dag_run.conf["seller_id"] }};""",
    dag=dag
)

What is the right approach to access the seller_id value that was passed in the curl request data.

5
  • why not just put the parameter directly into the SQL query? e.g. sql=f"""select ... from ... where a.name={{ dag_run.conf["seller_id"] }};""" Commented Mar 19, 2023 at 9:28
  • parameters is not one of the template fields, so you cannot put jinja template in that argument. See airflow.apache.org/docs/apache-airflow-providers-postgres/… Commented Mar 19, 2023 at 9:28
  • @mck Already tried with using it directly in the query with {{dag_run.conf['seller_id'] }} but it gives a syntax error - syntax error at or near "{" Any way to build the query from the api data? Commented Mar 19, 2023 at 10:43
  • Can you edit your question to show the complete code that you have tried to run, and the complete error message? Commented Mar 19, 2023 at 11:22
  • @mck Done with adding all the tried ways Commented Mar 21, 2023 at 10:13

1 Answer 1

0

I would first try a debug task to ensure the seller_id is being properly passed to the dagrun config using below

`

def print_request_id(**kwargs):
    request_id = kwargs['dag_run'].conf.get('seller_id')
    print(f"Rendered seller_id: {seller_id}")

debug_task = PythonOperator(
    task_id='debug_task',
    python_callable=print_request_id,
    provide_context=True,
    dag=dag
)

` Also I dont think your sql needs to be f string to evaluate the jinja. You can change that to as below and try

connection_source_check = PostgresOperator(
    task_id="fetch_connection_details",
    postgres_conn_id="postgres_hogwarts_connection",
    sql="select c.name as connection_name, c.id as connection_id, a.name as source_name, a.id, a.actor_type as
        source_id from connection c left join actor a on  a.id=c.source_id where a.name= '{{ dag_run.conf["seller_id"] }}' ;,
    dag=dag
)
Sign up to request clarification or add additional context in comments.

Comments

Your Answer

By clicking “Post Your Answer”, you agree to our terms of service and acknowledge you have read our privacy policy.

Start asking to get answers

Find the answer to your question by asking.

Ask question

Explore related questions

See similar questions with these tags.