Since atomicity is critical, parallelization is not an option.
If performance is of the essence, sqlalchemy is probably not the most efficient way to insert large amounts of data in the database. Assuming that you are using psycopg2 as your database driver, you can have a look at the `COPY command (which is blazingly fast).
Have a look at the following example which combines sqlalchemy and the raw copy_expert command of psycopg2. This is the fastest I have ever gotten inserts, resulting in a couple of million rows per second.
The syntax for the COPYcommand and further here, the documentation for copy_expert is located here
import csv
from sqlalchemy import create_engine, Column, Integer, String
from sqlalchemy.orm import declarative_base
# setup testdata
testdata = [
["col1", "col2", "text"],
[21, 50, "mystring"],
[48, 78, None],
[49, 78, "string with 'quotes'"],
]
with open("testdata.csv", "w") as csvfile:
writer = csv.writer(csvfile, delimiter=",", quotechar='"')
writer.writerows(testdata)
# Define Testtable
Base = declarative_base()
class TestTable(Base):
__tablename__ = "mytable"
col1 = Column(Integer, primary_key=True)
col2 = Column(Integer)
text = Column(String)
# interesting code starts here
engine = create_engine("postgresql+psycopg2://lukas:123@localhost/test")
Base.metadata.create_all(bind=engine)
# raw_connection returns a psycopg2 connection which has a copy_expert method
conn = engine.raw_connection()
try:
with open("testdata.csv", "r", buffering=8192) as f:
cols = [c.strip() for c in f.readline().split(",")]
f.seek(0)
cur = conn.cursor()
# These two lines are the gist of it all
command = f"""COPY {TestTable.__table__}({','.join(cols)}) FROM STDIN WITH (format csv, header, delimiter ',', quote '"', null "\'\'")"""
cur.copy_expert(command, f)
cur.close()
conn.commit()
conn.close()
except Exception as e:
conn.rollback()
raise e
asyncio: take a look atasyncpg, it is really fast