I'm using Apache Airflow to automate the process of loading data into a PostgreSQL database. My workflow consists of multiple tasks where I fetch customer and order data via API, insert or update customer records, and then insert order data into the database.
Everything works fine when I execute the tasks sequentially, but when I try to parallelize them, I encounter the following error during the insert operations:
[2024-12-26T17:24:59.503+0000] {db_insert_customer.py:60} ERROR - Failed to insert/update customer 4507****0 - leticia.****@hotmail.com: current transaction is aborted, commands ignored until end of transaction block
Traceback (most recent call last):
File "/opt/airflow/airflow_files/tasks/data_load/db_insert_customer.py", line 57, in db_insert_customer
cursor.execute(query, (row['name'], row['email'], row['cpf'], row['phone']))
psycopg2.errors.InFailedSqlTransaction: current transaction is aborted, commands ignored until end of transaction block
Description of the Workflow:
- I retrieve data for a given week.
- I insert customer data into the database (using an UPDATE if the customer already exists, to avoid duplication). I'm using the following query
query = sql.SQL("""
INSERT INTO app_schema.customers(name, email, cpf, phone)
VALUES (%s, %s, %s, %s)
ON CONFLICT (cpf) DO UPDATE
SET email = EXCLUDED.email,
phone = EXCLUDED.phone,
name = EXCLUDED.name
""")
- After that, I fetch order data and insert it into the orders table.
- These operations are done in parallel for each week, as each week's data is independent.
However, when running the tasks in parallel, I sometimes face the issue where the transaction is aborted, causing the data to not be inserted into the database.
Questions:
- What could be causing the transaction to be aborted when I parallelize the tasks?
- How can I resolve this issue and ensure data is correctly inserted into the database, even with parallel execution?
- Should I consider adding a step to save data into a CSV file before inserting it into the database to avoid these transaction issues? Or would this add unnecessary complexity?
Additional Information:
- I'm using PostgreSQL as the database.
- I’m using psycopg2 to interact with the database.
- The customer_id field is set to serial, which might be a potential issue in parallel transactions.
EDIT: Also, the PostgreSQL returned the following error
[2024-12-26T18:59:11.974+0000] {db_insert_customer.py:66} ERROR - Failed to insert/update customer 030**** - anyn****[email protected]: deadlock detected
DETAIL: Process 1456 waits for ShareLock on transaction 2230; blocked by process 1458.
Process 1458 waits for ShareLock on transaction 2227; blocked by process 1456.
HINT: See server log for query details.
CONTEXT: while inserting index tuple (103,67) in relation "customers"
[2024-12-26T18:59:11.982+0000] {db_insert_customer.py:66} ERROR - Failed to insert/update customer 030**** - anyn****[email protected]: current transaction is aborted, commands ignored until end of transaction block
UPDATEorINSERT ... ON CONFLICT? 2) Are you usingtry/exceptfor thepsycopg2queries? 3) I would suggest looking at the Postgres server log to get a more complete picture of what is happening. Add the answers to the preceding as text update to question text.