AWS Glue to Redshift: duplicate data?

2019-08-20 03:07发布

问题:

Here are some bullet points in terms of how I have things setup:

I have CSV files uploaded to S3 and a Glue crawler setup to create the table and schema. I have a Glue job setup that writes the data from the Glue table to our Amazon Redshift database using a JDBC connection. The Job also is in charge of mapping the columns and creating the redshift table. By re-running a job, I am getting duplicate rows in redshift (as expected).

However, is there way to replace or delete rows before inserting the new data?

BOOKMARK functionality is Enable but not working.

How can I connect to redshift, delete all data as a part of JOB before pushing data to redshift in Python?

回答1:

Currently Glue doesn't support bookmarking for JDBC sources.

You can implement upsert/merge into Redshift in Glue job using postactions option (code in Scala):

val fields = sourceDf.columns.mkString(",")

glueContext.getJDBCSink(
  catalogConnection = "RedshiftConnectionTest",
  options = JsonOptions(Map(
    "database" -> "conndb",
    "dbtable" -> "staging_schema.staging_table",
    "postactions" -> 
        s"""
           DELETE FROM dst_schema.dst_table USING staging_schema.staging_table AS S WHERE dst_table.id = S.id;
           INSERT INTO dst_schema.dst_table ($fields) SELECT $fields FROM staging_schema.staging_table;
           DROP TABLE IF EXISTS staging_schema.staging_table
        """
  )),
  redshiftTmpDir = tempDir,
  transformationContext = "redshift-output"
).writeDynamicFrame(DynamicFrame(sourceDf, glueContext))

If you just want to delete existing table then you can use preactions parameter instead:

glueContext.getJDBCSink(
  catalogConnection = "RedshiftConnectionTest",
  options = JsonOptions(Map(
    "database" -> "conndb",
    "dbtable" -> "dst_schema.dst_table",
    "preactions" -> "DELETE FROM dst_schema.dst_table"
  )),
  redshiftTmpDir = tempDir,
  transformationContext = "redshift-output"
).writeDynamicFrame(DynamicFrame(sourceDf, glueContext))


回答2:

So long as you have a unique key on your tables, ideally a integer primary key. Then way that I tackle this is as follows:

  1. Implement a scheduling tool to allow running of jobs in order. I recommend Airflow.
  2. Initiate the Glue job to read from source and write to a staging table. (the staging table will only contain the output from that glue run, not necessarily all rows)
  3. Wait for that Glue job to finish (using scheduling tool)
  4. Initiate a SQL job running on Redshift that :

a) deletes the matching rows from the target table

delete from target
where id in (select id from staging);

b) Inserts the data from staging to target table

insert into target select * from staging;

c) truncates the staging table

d) vacuum and analyze both tables

vacuum target to 100 percent;
analyze target;
vacuum staging;


回答3:

You could use python module pg8000 in order to connect to Redfshift and execute SQL to delete (drop/truncate) the data from your Glue script. pg8000 is pure python so it works with Glue.

Check out this link: AWS Glue - Truncate destination postgres table prior to insert

I have tried it and it works fine. Hope this help you out,



回答4:

If you are looking to do a full load, you can use spark/Pyspark databricks library to do an overwrite of the table:

df.write\
  .format("com.databricks.spark.redshift")\
  .option("url", redshift_url)\
  .option("dbtable", redshift_table)\
  .option("user", user)\
  .option("password", readshift_password)\
  .option("aws_iam_role", redshift_copy_role)\
  .option("tempdir", args["TempDir"])\
  .mode("overwrite")\
  .save()

Per Databricks/Spark documentation:

Overwriting an existing table: By default, this library uses transactions to perform overwrites, which are implemented by deleting the destination table, creating a new empty table and appending rows to it.

You can take a look at databricks documentation in here