How to run SQL statement from Databricks cluster

2019-08-13 14:06发布

I have an Azure Databricks cluster that processes various tables and then as a final step I push these table into an Azure SQL Server to be used by some other processes. I have a cell in databricks that looks something like this:

def generate_connection():
  jdbcUsername = dbutils.secrets.get(scope = "Azure-Key-Vault-Scope", key = "AzureSqlUserName")
  jdbcPassword = dbutils.secrets.get(scope = "Azure-Key-Vault-Scope", key = "AzureSqlPassword")
  connectionProperties = {
    "user" : jdbcUsername,
    "password" : jdbcPassword,
    "driver" : "com.microsoft.sqlserver.jdbc.SQLServerDriver"
  }
  return connectionProperties

def generate_url():
  jdbcHostname = dbutils.secrets.get(scope = "Azure-Key-Vault-Scope", key = "AzureSqlHostName")
  jdbcDatabase = dbutils.secrets.get(scope = "Azure-Key-Vault-Scope", key = "AzureSqlDatabase")
  jdbcPort = 1433
  return "jdbc:sqlserver://{0}:{1};database={2}".format(jdbcHostname, jdbcPort, jdbcDatabase)


def persist_table(table, sql_table, mode):
  jdbcUrl = generate_url();
  connectionProperties = generate_connection()
  table.write.jdbc(jdbcUrl, sql_table, properties=connectionProperties, mode=mode)

persist_table(spark.table("Sales.OpenOrders"), "Sales.OpenOrders", "overwrite")
persist_table(spark.table("Sales.Orders"), "Sales.Orders", "overwrite")

This works as expected. The problem that I have is that the Orders table is very large and only a small fraction of the rows could possible change each day, so what I want to do is change the overwrite mode to the append mode and change the data frame from being the entire table to just the rows that could have changed. All of this I know how to do easily enough, but what I want to do is run a simple SQL statement against the Azure SQL database to remove the rows that are already going to be there, so that they possibly changed rows will be inserted back.

I want to run a SQL statement against the Azure SQL database like

Delete From Sales.Orders Where CreateDate >= '01/01/2019'

1条回答
聊天终结者
2楼-- · 2019-08-13 14:40

You need to use the pyodbc library. You can connect and use sql statements.

import pyodbc

conn = pyodbc.connect( 'DRIVER={ODBC Driver 17 for SQL Server};'
                       'SERVER=mydatabe.database.azure.net;'
                       'DATABASE=AdventureWorks;UID=jonnyFast;'
                       'PWD=MyPassword')

# Example doing a simple execute
conn.execute('INSERT INTO Bob (Bob1, Bob2) VALUES (?, ?)', ('A', 'B'))

Unfortunately to get it working on databricks is a bit of a pain. I wrote a blog post a while back which should help. https://datathirst.net/blog/2018/10/12/executing-sql-server-stored-procedures-on-databricks-pyspark

查看更多
登录 后发表回答