我想条目(600K〜)的数量巨大上传到一个PostgreSQL数据库一个简单的表格,有一个外键,时间戳以及每个条目3浮动。 然而,它需要每每个条目60毫秒执行描述的核心批量插入此处 ,从而整个执行将需要10小时。 我发现,它的性能问题executemany()
方法,但它已经解决了与execute_values()
方法psycopg2 2.7 。
我运行的代码如下:
#build a huge list of dicts, one dict for each entry
engine.execute(SimpleTable.__table__.insert(),
values) # around 600k dicts in a list
我看,这是一个普遍的问题,但我还没有设法找到在SQLAlchemy中本身就是一个解决方案。 有没有办法告诉SQLAlchemy的调用execute_values()
在某些场合? 有没有办法实现巨大的插入,而不通过自己构建SQL语句任何其他方式?
谢谢您的帮助!
不是你在这个意义上找,这并没有解决试图指示SQLAlchemy的使用psycopg演员,需要答案-排序-手动SQL,但是:你可以从发动机与访问底层psycopg连接raw_connection()
,它允许使用COPY FROM :
import io
import csv
from psycopg2 import sql
def bulk_copy(engine, table, values):
csv_file = io.StringIO()
headers = list(values[0].keys())
writer = csv.DictWriter(csv_file, headers)
writer.writerows(values)
csv_file.seek(0)
# NOTE: `format()` here is *not* `str.format()`, but
# `SQL.format()`. Never use plain string formatting.
copy_stmt = sql.SQL("COPY {} (" +
",".join(["{}"] * len(headers)) +
") FROM STDIN CSV").\
format(sql.Identifier(str(table.name)),
*(sql.Identifier(col) for col in headers))
# Fetch a raw psycopg connection from the SQLAlchemy engine
conn = engine.raw_connection()
try:
with conn.cursor() as cur:
cur.copy_expert(copy_stmt, csv_file)
except Exception as e:
conn.rollback()
raise e
else:
conn.commit()
finally:
conn.close()
然后
bulk_copy(engine, SimpleTable.__table__, values)
相比于执行INSERT语句这应该是足够快。 移动60万点的记录本机上花了大约8秒〜为13μs/记录。 你也可以使用带有额外包的原始连接和光标。
同时也成为可能(从SQLAlchemy的1.2.0)与use_batch_mode
的标志create_engine()
函数。 查看文档 。 它使用execute_batch()
从功能psycopg.extras
。