0

In my PostgresOperator, I would like to pass SQL file path as variable to sql parameter. I am reading this file path from a configuration file:

sql_execution = PostgresOperator(
        task_id='sql_query',
        postgres_conn_id=task_config['postgres_conn'],
        sql=task_config['sql_file']
    )

As far as I understood, the operator treat the variable content as SQL statement rather than file path to SQL file. The error is as follow:

psycopg2.errors.SyntaxError: syntax error at or near "scripts"
LINE 1: scripts/samples/hello_world.sql

I have tried the following variations:

Define full path for sql file:

sql_file = os.path.join(os.path.dirname(__file__), task_config['sql_file'])
sql_execution = PostgresOperator(
     task_id=task_id,
     postgres_conn_id=task_config['postgres_conn_id'],
     sql=sql_file,
   )
sql_file = os.path.join(os.path.dirname(__file__), task_config['sql_file'])
sql_execution = PostgresOperator(
     task_id=task_id,
     postgres_conn_id=task_config['postgres_conn_id'],
     sql=f'{sql_file}',
   )

The operator still treat it as SQL query rather than a file path to SQL file.

3 Answers 3

0

As a workaround, I read he content of the SQl file in a variable:

def read_sql_file(file_path):
    full_path = os.path.join(os.path.dirname(__file__), file_path)
    with open(full_path, 'r') as file:
        return file.read()



sql_content = read_sql_file(task_config['sql_file'])
sql_execution = PostgresOperator(
                        task_id=task_id,
                        postgres_conn_id=task_config['postgres_conn_id'],
                        sql=f'{sql_content}'
                    )

This works fine but i still would like to know if there is way to pass SQL file path as variable.

Sign up to request clarification or add additional context in comments.

Comments

0

First let me mention that PostgresOperator is deprecated in favor of SQLExecuteQueryOperator so I will use it in the explnation.

You don't need to worry about how to open read the .sql file. Airflow handles this for you as part of the executing the task. Assuming the .sql file is in a relative path to the DAG you can simply do:

from airflow.providers.common.sql.operators.sql import SQLExecuteQueryOperator

SQLExecuteQueryOperator(
    task_id=task_id,
    conn_id=task_config['postgres_conn_id'],
    sql="myfile.sql"
)

This is possible due to template_ext that lists .sql files as possible Jinja extension. Note that if the .sql file is not relative to the DAG path then your DAG object will need to define template_searchpath that tells the DAG where to find the file:

with DAG(
    ...
    template_searchpath=['path_to_file']
) as dag:

Comments

0

Apparently this seems to be related to use of TaskFlow API rather than using Operators directly:

The issue arises because the @task decorator is not intended to be used directly with Airflow operators like TriggerDagRunOperator or PostgresOperator. Instead, these operators should be used directly within the DAG definition without the TaskFlow API's @task decorator.

So,both of the following version work fine. One with direct user of operator and sql_file and the other with TaskFlow API and sql content:

TaskFlow with SQL content

@task(task_id=task_id)
def sql_task(task_config):
    sql_content = read_sql_file(task_config['sql_file'])
    PostgresOperator(
        task_id=task_id,
        postgres_conn_id=task_config['postgres_conn_id'],
        sql=f'{sql_content}',
        parameters=task_config['parameters'],
        autocommit=True,
    ).execute({})

Direct use of Operator

sql_task = PostgresOperator(
                task_id=task_id,
                postgres_conn_id=task_config['postgres_conn_id'],
                sql=task_config['sql_file'],
                autocommit=True,
            )
            task_dict[task_id] = sql_task

Comments

Your Answer

By clicking “Post Your Answer”, you agree to our terms of service and acknowledge you have read our privacy policy.

Start asking to get answers

Find the answer to your question by asking.

Ask question

Explore related questions

See similar questions with these tags.