0

I'm using Apache Airflow to automate the process of loading data into a PostgreSQL database. My workflow consists of multiple tasks where I fetch customer and order data via API, insert or update customer records, and then insert order data into the database.

Everything works fine when I execute the tasks sequentially, but when I try to parallelize them, I encounter the following error during the insert operations:

[2024-12-26T17:24:59.503+0000] {db_insert_customer.py:60} ERROR - Failed to insert/update customer 4507****0 - leticia.****@hotmail.com: current transaction is aborted, commands ignored until end of transaction block
Traceback (most recent call last):
  File "/opt/airflow/airflow_files/tasks/data_load/db_insert_customer.py", line 57, in db_insert_customer
    cursor.execute(query, (row['name'], row['email'], row['cpf'], row['phone']))
psycopg2.errors.InFailedSqlTransaction: current transaction is aborted, commands ignored until end of transaction block

Description of the Workflow:

  1. I retrieve data for a given week.
  2. I insert customer data into the database (using an UPDATE if the customer already exists, to avoid duplication). I'm using the following query
   query = sql.SQL("""
          INSERT INTO app_schema.customers(name, email, cpf, phone)
          VALUES (%s, %s, %s, %s)
          ON CONFLICT (cpf) DO UPDATE
          SET email = EXCLUDED.email,
            phone = EXCLUDED.phone,
            name = EXCLUDED.name
        """)
  1. After that, I fetch order data and insert it into the orders table.
  2. These operations are done in parallel for each week, as each week's data is independent.

However, when running the tasks in parallel, I sometimes face the issue where the transaction is aborted, causing the data to not be inserted into the database.

Questions:

  1. What could be causing the transaction to be aborted when I parallelize the tasks?
  2. How can I resolve this issue and ensure data is correctly inserted into the database, even with parallel execution?
  3. Should I consider adding a step to save data into a CSV file before inserting it into the database to avoid these transaction issues? Or would this add unnecessary complexity?

Additional Information:

  • I'm using PostgreSQL as the database.
  • I’m using psycopg2 to interact with the database.
  • The customer_id field is set to serial, which might be a potential issue in parallel transactions.

EDIT: Also, the PostgreSQL returned the following error

[2024-12-26T18:59:11.974+0000] {db_insert_customer.py:66} ERROR - Failed to insert/update customer 030**** - anyn****[email protected]: deadlock detected
DETAIL:  Process 1456 waits for ShareLock on transaction 2230; blocked by process 1458.
Process 1458 waits for ShareLock on transaction 2227; blocked by process 1456.
HINT:  See server log for query details.
CONTEXT:  while inserting index tuple (103,67) in relation "customers"
[2024-12-26T18:59:11.982+0000] {db_insert_customer.py:66} ERROR - Failed to insert/update customer 030**** - anyn****[email protected]: current transaction is aborted, commands ignored until end of transaction block
2
  • 1
    1) Is this ...using an UPDATE if the customer already exists a separate UPDATE or INSERT ... ON CONFLICT? 2) Are you using try/except for the psycopg2 queries? 3) I would suggest looking at the Postgres server log to get a more complete picture of what is happening. Add the answers to the preceding as text update to question text. Commented Dec 26, 2024 at 20:59
  • I updated the main question to incorporate your questions—thank you! It seems that parallelism is causing a block in the database. Commented Dec 27, 2024 at 2:25

0

Your Answer

By clicking “Post Your Answer”, you agree to our terms of service and acknowledge you have read our privacy policy.

Start asking to get answers

Find the answer to your question by asking.

Ask question

Explore related questions

See similar questions with these tags.