I have an Azure Databricks cluster that processes various tables and then as a final step I push these table into an Azure SQL Server to be used by some other processes. I have a cell in databricks that looks something like this:
def generate_connection():
jdbcUsername = dbutils.secrets.get(scope = "Azure-Key-Vault-Scope", key = "AzureSqlUserName")
jdbcPassword = dbutils.secrets.get(scope = "Azure-Key-Vault-Scope", key = "AzureSqlPassword")
connectionProperties = {
"user" : jdbcUsername,
"password" : jdbcPassword,
"driver" : "com.microsoft.sqlserver.jdbc.SQLServerDriver"
}
return connectionProperties
def generate_url():
jdbcHostname = dbutils.secrets.get(scope = "Azure-Key-Vault-Scope", key = "AzureSqlHostName")
jdbcDatabase = dbutils.secrets.get(scope = "Azure-Key-Vault-Scope", key = "AzureSqlDatabase")
jdbcPort = 1433
return "jdbc:sqlserver://{0}:{1};database={2}".format(jdbcHostname, jdbcPort, jdbcDatabase)
def persist_table(table, sql_table, mode):
jdbcUrl = generate_url();
connectionProperties = generate_connection()
table.write.jdbc(jdbcUrl, sql_table, properties=connectionProperties, mode=mode)
persist_table(spark.table("Sales.OpenOrders"), "Sales.OpenOrders", "overwrite")
persist_table(spark.table("Sales.Orders"), "Sales.Orders", "overwrite")
This works as expected. The problem that I have is that the Orders table is very large and only a small fraction of the rows could possible change each day, so what I want to do is change the overwrite mode to the append mode and change the data frame from being the entire table to just the rows that could have changed. All of this I know how to do easily enough, but what I want to do is run a simple SQL statement against the Azure SQL database to remove the rows that are already going to be there, so that they possibly changed rows will be inserted back.
I want to run a SQL statement against the Azure SQL database like
Delete From Sales.Orders Where CreateDate >= '01/01/2019'
You need to use the pyodbc library. You can connect and use sql statements.
Unfortunately to get it working on databricks is a bit of a pain. I wrote a blog post a while back which should help. https://datathirst.net/blog/2018/10/12/executing-sql-server-stored-procedures-on-databricks-pyspark