Here are some bullet points in terms of how I have things setup:
I have CSV files uploaded to S3 and a Glue crawler setup to create the table and schema.
I have a Glue job setup that writes the data from the Glue table to our Amazon Redshift database using a JDBC connection.
The Job also is in charge of mapping the columns and creating the redshift table.
By re-running a job, I am getting duplicate rows in redshift (as expected).
However, is there way to replace or delete rows before inserting the new data?
BOOKMARK functionality is Enable but not working.
How can I connect to redshift, delete all data as a part of JOB before pushing data to redshift in Python?
Currently Glue doesn't support bookmarking for JDBC sources.
You can implement upsert/merge into Redshift in Glue job using postactions
option (code in Scala):
val fields = sourceDf.columns.mkString(",")
glueContext.getJDBCSink(
catalogConnection = "RedshiftConnectionTest",
options = JsonOptions(Map(
"database" -> "conndb",
"dbtable" -> "staging_schema.staging_table",
"postactions" ->
s"""
DELETE FROM dst_schema.dst_table USING staging_schema.staging_table AS S WHERE dst_table.id = S.id;
INSERT INTO dst_schema.dst_table ($fields) SELECT $fields FROM staging_schema.staging_table;
DROP TABLE IF EXISTS staging_schema.staging_table
"""
)),
redshiftTmpDir = tempDir,
transformationContext = "redshift-output"
).writeDynamicFrame(DynamicFrame(sourceDf, glueContext))
If you just want to delete existing table then you can use preactions
parameter instead:
glueContext.getJDBCSink(
catalogConnection = "RedshiftConnectionTest",
options = JsonOptions(Map(
"database" -> "conndb",
"dbtable" -> "dst_schema.dst_table",
"preactions" -> "DELETE FROM dst_schema.dst_table"
)),
redshiftTmpDir = tempDir,
transformationContext = "redshift-output"
).writeDynamicFrame(DynamicFrame(sourceDf, glueContext))
So long as you have a unique key on your tables, ideally a integer primary key.
Then way that I tackle this is as follows:
- Implement a scheduling tool to allow running of jobs in order. I
recommend Airflow.
- Initiate the Glue job to read from source and write to a staging
table. (the staging table will only contain the output from that glue run, not necessarily all rows)
- Wait for that Glue job to finish (using scheduling tool)
- Initiate a SQL job running on Redshift that :
a) deletes the matching rows from the target table
delete from target
where id in (select id from staging);
b) Inserts the data from staging to target table
insert into target select * from staging;
c) truncates the staging table
d) vacuum and analyze both tables
vacuum target to 100 percent;
analyze target;
vacuum staging;
You could use python module pg8000 in order to connect to Redfshift and execute SQL to delete (drop/truncate) the data from your Glue script. pg8000 is pure python so it works with Glue.
Check out this link: AWS Glue - Truncate destination postgres table prior to insert
I have tried it and it works fine. Hope this help you out,
If you are looking to do a full load, you can use spark/Pyspark databricks library to do an overwrite of the table:
df.write\
.format("com.databricks.spark.redshift")\
.option("url", redshift_url)\
.option("dbtable", redshift_table)\
.option("user", user)\
.option("password", readshift_password)\
.option("aws_iam_role", redshift_copy_role)\
.option("tempdir", args["TempDir"])\
.mode("overwrite")\
.save()
Per Databricks/Spark documentation:
Overwriting an existing table: By default, this library uses
transactions to perform overwrites, which are implemented by deleting
the destination table, creating a new empty table and appending rows
to it.
You can take a look at databricks documentation in here