如何使用psycopg2.extras在SQLAlchemy的?如何使用psycopg2.extra

2019-05-12 11:10发布

我想条目(600K〜)的数量巨大上传到一个PostgreSQL数据库一个简单的表格,有一个外键,时间戳以及每个条目3浮动。 然而,它需要每每个条目60毫秒执行描述的核心批量插入此处 ,从而整个执行将需要10小时。 我发现,它的性能问题executemany()方法,但它已经解决了与execute_values()方法psycopg2 2.7 。

我运行的代码如下:

#build a huge list of dicts, one dict for each entry
engine.execute(SimpleTable.__table__.insert(),
               values) # around 600k dicts in a list

我看,这是一个普遍的问题,但我还没有设法找到在SQLAlchemy中本身就是一个解决方案。 有没有办法告诉SQLAlchemy的调用execute_values()在某些场合? 有没有办法实现巨大的插入,而不通过自己构建SQL语句任何其他方式?

谢谢您的帮助!

Answer 1:

不是你在这个意义上找,这并没有解决试图指示SQLAlchemy的使用psycopg演员,需要答案-排序-手动SQL,但是:你可以从发动机与访问底层psycopg连接raw_connection() ,它允许使用COPY FROM :

import io
import csv
from psycopg2 import sql

def bulk_copy(engine, table, values):
    csv_file = io.StringIO()
    headers = list(values[0].keys())
    writer = csv.DictWriter(csv_file, headers)
    writer.writerows(values)

    csv_file.seek(0)

    # NOTE: `format()` here is *not* `str.format()`, but
    # `SQL.format()`. Never use plain string formatting.
    copy_stmt = sql.SQL("COPY {} (" +
                        ",".join(["{}"] * len(headers)) +
                        ") FROM STDIN CSV").\
        format(sql.Identifier(str(table.name)),
               *(sql.Identifier(col) for col in headers))

    # Fetch a raw psycopg connection from the SQLAlchemy engine
    conn = engine.raw_connection()
    try:
        with conn.cursor() as cur:
            cur.copy_expert(copy_stmt, csv_file)

    except Exception as e:
        conn.rollback()
        raise e

    else:
        conn.commit()

    finally:
        conn.close()

然后

bulk_copy(engine, SimpleTable.__table__, values)

相比于执行INSERT语句这应该是足够快。 移动60万点的记录本机上花了大约8秒〜为13μs/记录。 你也可以使用带有额外包的原始连接和光标。



Answer 2:

同时也成为可能(从SQLAlchemy的1.2.0)与use_batch_mode的标志create_engine()函数。 查看文档 。 它使用execute_batch()从功能psycopg.extras



文章来源: How can I use psycopg2.extras in sqlalchemy?