Copying data from S3 to AWS redshift using python

2019-02-06 12:37发布

I'm having issues executing the copy command to load data from S3 to Amazon's Redshift from python.
I have the following copy command:

copy moves from 's3://<my_bucket_name>/moves_data/2013-03-24/18/moves'
credentials 'aws_access_key_id=<key_id>;aws_secret_access_key=<key_secret>'
removequotes
delimiter ',';

When I execute this command using SQL Workbench/j everything works as expected, however when I try to execute this with python and psycopg2 the command pass OK but no data is loaded and no error is thrown.
tried the following two options (assume psycopg2 connection is OK because it is):

cursor.execute(copy_command)  
cursor.copy_expert(copy_command, sys.stdout)

both pass with no warning yet data isn't loaded

Ideas?

Thanks

4条回答
等我变得足够好
2楼-- · 2019-02-06 13:08

if you are using sqlalchemy, copy command will not auto commit by itself. this worked for me:

from sqlalchemy import create_engine
eng = create_engine(...)
command = """
copy command here
"""
conn = eng.connect()
result = conn.execution_options(autocommit=True).execute(command)
result.close()
查看更多
叼着烟拽天下
3楼-- · 2019-02-06 13:10

I have used this exact setup (psycopg2 + redshift + COPY) successfully. Did you commit afterwards? SQL Workbench defaults to auto-commit while psycopg2 defaults to opening a transaction, so the data won't be visible until you call commit() on your connection.

The full workflow is:

conn = psycopg2.connect(...)
cur = conn.cursor()
cur.execute("COPY...")
conn.commit()

I don't believe that copy_expert() or any of the cursor.copy_* commands work with Redshift.

查看更多
Luminary・发光体
4楼-- · 2019-02-06 13:16

The syntax should be similar to DDL statements

# Create table
c.execute('''CREATE TABLE stocks
             (date text, trans text, symbol text, qty real, price real)''')
查看更多
对你真心纯属浪费
5楼-- · 2019-02-06 13:18

First, make sure the transaction is committed.

conn = psycopg2.connect(conn_string)
cur = conn.cursor()
cur.execute(copy_cmd_str)
conn.commit()

you can ensure a transaction-commit with following way as well (ensuring releasing the resources),

with psycopg2.connect(conn_string) as conn:
    with conn.cursor() as curs:
        curs.execute(copy_cmd_str)

When a connection exits the with block, if no exception has been raised by the block, the transaction is committed. In case of exception the transaction is rolled back.

Secondly, even doing commit does not help when the data to be loaded takes some long time and exceeds connect_timeout (and can't commit). So when explicit commit doesn't help, try with an increased timeout.

查看更多
登录 后发表回答