SQLAlchemy, Psycopg2 and Postgresql COPY

2019-01-18 19:46发布

It looks like Psycopg has a custom command for executing a COPY:

psycopg2 COPY using cursor.copy_from() freezes with large inputs

Is there a way to access this functionality from with SQLAlchemy?

6条回答
▲ chillily
2楼-- · 2019-01-18 20:20

accepted answer is correct but if you want more than just the EoghanM's comment to go on the following worked for me in COPYing a table out to CSV...

from sqlalchemy import sessionmaker, create_engine

eng = create_engine("postgresql://user:pwd@host:5432/db")
ses = sessionmaker(bind=engine)

dbcopy_f = open('/tmp/some_table_copy.csv','wb')

copy_sql = 'COPY some_table TO STDOUT WITH CSV HEADER'

fake_conn = eng.raw_connection()
fake_cur = fake_conn.cursor()
fake_cur.copy_expert(copy_sql, dbcopy_f)

The sessionmaker isn't necessary but if you're in the habit of creating the engine and the session at the same time to use raw_connection you'll need separate them (unless there is some way to access the engine through the session object that I don't know). The sql string provided to copy_expert is also not the only way to it, there is a basic copy_to function that you can use with subset of the parameters that you could past to a normal COPY TO query. Overall performance of the command seems fast for me, copying out a table of ~20000 rows.

http://initd.org/psycopg/docs/cursor.html#cursor.copy_to http://docs.sqlalchemy.org/en/latest/core/connections.html#sqlalchemy.engine.Engine.raw_connection

查看更多
Fickle 薄情
3楼-- · 2019-01-18 20:27

You can use:

def to_sql(engine, df, table, if_exists='fail', sep='\t', encoding='utf8'):
    # Create Table
    df[:0].to_sql(table, engine, if_exists=if_exists)

    # Prepare data
    output = cStringIO.StringIO()
    df.to_csv(output, sep=sep, header=False, encoding=encoding)
    output.seek(0)

    # Insert data
    connection = engine.raw_connection()
    cursor = connection.cursor()
    cursor.copy_from(output, table, sep=sep, null='')
    connection.commit()
    cursor.close()

I insert 200000 lines in 5 seconds instead of 4 minutes

查看更多
家丑人穷心不美
4楼-- · 2019-01-18 20:29

If you can get to the engine you have all you need to do this:

engine = create_engine('postgresql+psycopg2://myuser:password@localhost/mydb')
# or 
engine = session.engine
# or any other way you know to get to the engine

Now you can work.

# isolate a connection
connection = engine.connect().connection

# get the cursor
cursor = connection.cursor()

Here are some templates for the COPY statement to use with cursor.copy_expert(), a more complete and flexible option than copy_from() or copy_to() as it is indicated here: http://initd.org/psycopg/docs/cursor.html#cursor.copy_expert.

# to dump to a file
dump_to = """
COPY mytable 
TO STDOUT
WITH (
    FORMAT CSV,
    DELIMITER ',',
    HEADER
);
"""

# to copy from a file:
copy_from = """
COPY mytable 
FROM STDIN
WITH (
    FORMAT CSV,
    DELIMITER ',',
    HEADER
);
"""

Check out what the options above mean and others that may be of interest to your specific situation https://www.postgresql.org/docs/current/static/sql-copy.html.

IMPORTANT NOTE: The link to the documentation of cursor.copy_expert() indicates to use STDOUT to write out to a file and STDIN to copy from a file. But if you look at the syntax on the PostgreSQL manual, you'll notice that you can also specify the file to write to or from directly in the COPY statement. Don't do that, you're likely just wasting your time if you're not running as root (who runs Python as root during development?) Just do what's indicated in the psycopg2's docs and specify STDIN or STDOUT in your statement with cursor.copy_expert(), it should be fine.

# running the copy statement
with open('/path/to/your/data/file.csv') as f:
     cursor.copy_expert(copy_from, file=f)

# don't forget to commit the changes.
connection.commit()
查看更多
聊天终结者
5楼-- · 2019-01-18 20:30

It doesn't look like it.

You may have to just use psycopg2 to expose this functionality and forego the ORM capabilities. I guess I don't really see the benefit of ORM in such an operation anyway since it's a straight bulk insert and dealing with individual objects a la an ORM would not really make a whole lot of sense.

查看更多
仙女界的扛把子
6楼-- · 2019-01-18 20:33

If your engine is configured with a psycopg2 connection string (which is the default, so either "postgresql://..." or "postgresql+psycopg2://..."), you can create a psycopg2 cursor from an SQL Alchemy session using

cursor = session.connection().connection.cursor()

which you can use to execute

cursor.copy_from(...)

The cursor will be active in the same transaction as your session currently is. If a commit or rollback happens, any further use of the cursor with throw a psycopg2.InterfaceError, you would have to create a new one.

查看更多
等我变得足够好
7楼-- · 2019-01-18 20:37

You don't need to drop down to psycopg2, use raw_connection nor a cursor.

Just execute the sql as usual, you can even use bind parameters with text():

engine.execute(text('''copy some_table from :csv
                       delimiter ',' csv'''
                   ).execution_options(autocommit=True),
               csv='/tmp/a.csv')

You can drop the execution_options(autocommit=True) if this PR will be accepted

查看更多
登录 后发表回答