I want to upload a huge number of entries (~600k) into a simple table in a PostgreSQL DB, with one foreign key, a timestamp and 3 float per each entry. However, it takes 60 ms per each entry to execute the core bulk insert described here, thus the whole execution would take 10 h. I have found out, that it is a performance issue of executemany()
method, however it has been solved with the execute_values()
method in psycopg2 2.7.
The code I run is the following:
#build a huge list of dicts, one dict for each entry
engine.execute(SimpleTable.__table__.insert(),
values) # around 600k dicts in a list
I see that it is a common problem, however I have not managed to find a solution in sqlalchemy itself. Is there any way to tell sqlalchemy to call execute_values()
in some occasions? Is there any other way to implement huge inserts without constructing the SQL statements by myself?
Thanks for the help!
Not the answer you are looking for in the sense that this does not address attempting to instruct SQLAlchemy to use the psycopg extras, and requires – sort of – manual SQL, but: you can access the underlying psycopg connections from an engine with raw_connection()
, which allows using COPY FROM:
import io
import csv
from psycopg2 import sql
def bulk_copy(engine, table, values):
csv_file = io.StringIO()
headers = list(values[0].keys())
writer = csv.DictWriter(csv_file, headers)
writer.writerows(values)
csv_file.seek(0)
# NOTE: `format()` here is *not* `str.format()`, but
# `SQL.format()`. Never use plain string formatting.
copy_stmt = sql.SQL("COPY {} (" +
",".join(["{}"] * len(headers)) +
") FROM STDIN CSV").\
format(sql.Identifier(str(table.name)),
*(sql.Identifier(col) for col in headers))
# Fetch a raw psycopg connection from the SQLAlchemy engine
conn = engine.raw_connection()
try:
with conn.cursor() as cur:
cur.copy_expert(copy_stmt, csv_file)
conn.commit()
except:
conn.rollback()
raise
finally:
conn.close()
and then
bulk_copy(engine, SimpleTable.__table__, values)
This should be plenty fast compared to executing INSERT statements. Moving 600,000 records on this machine took around 8 seconds, ~13µs/record. You could also use the raw connections and cursor with the extras package.
Meanwhile it became possible (from SqlAlchemy 1.2.0) with the use_batch_mode
flag on the create_engine()
function. See the docs. It uses the execute_batch()
function from psycopg.extras
.