如何运行Databricks集群的SQL语句(How to run SQL statement fr

2019-09-28 01:37发布

我有其处理各种表格,然后作为最后的步骤我推这些表为天青SQL Server以通过一些其它方法中使用的天青Databricks群集。 我在databricks细胞看起来是这样的:

def generate_connection():
  jdbcUsername = dbutils.secrets.get(scope = "Azure-Key-Vault-Scope", key = "AzureSqlUserName")
  jdbcPassword = dbutils.secrets.get(scope = "Azure-Key-Vault-Scope", key = "AzureSqlPassword")
  connectionProperties = {
    "user" : jdbcUsername,
    "password" : jdbcPassword,
    "driver" : "com.microsoft.sqlserver.jdbc.SQLServerDriver"
  }
  return connectionProperties

def generate_url():
  jdbcHostname = dbutils.secrets.get(scope = "Azure-Key-Vault-Scope", key = "AzureSqlHostName")
  jdbcDatabase = dbutils.secrets.get(scope = "Azure-Key-Vault-Scope", key = "AzureSqlDatabase")
  jdbcPort = 1433
  return "jdbc:sqlserver://{0}:{1};database={2}".format(jdbcHostname, jdbcPort, jdbcDatabase)


def persist_table(table, sql_table, mode):
  jdbcUrl = generate_url();
  connectionProperties = generate_connection()
  table.write.jdbc(jdbcUrl, sql_table, properties=connectionProperties, mode=mode)

persist_table(spark.table("Sales.OpenOrders"), "Sales.OpenOrders", "overwrite")
persist_table(spark.table("Sales.Orders"), "Sales.Orders", "overwrite")

这按预期工作。 我的问题是,该订单表是非常大的,只有每一天的行可能可能变化的一小部分,所以我想要做的是被改变的覆盖模式的附加模式,改变数据帧整个表,只是行可能已经改变。 所有这一切,我知道该怎么做很轻松了,但我想要做的是对运行在Azure SQL数据库的简单SQL语句删除那些已经去那里行,使他们有可能改变的行会被插回。

我想对运行在Azure SQL数据库这样一个SQL语句

Delete From Sales.Orders Where CreateDate >= '01/01/2019'

Answer 1:

您需要使用pyodbc库。 您可以连接并使用SQL语句。

import pyodbc

conn = pyodbc.connect( 'DRIVER={ODBC Driver 17 for SQL Server};'
                       'SERVER=mydatabe.database.azure.net;'
                       'DATABASE=AdventureWorks;UID=jonnyFast;'
                       'PWD=MyPassword')

# Example doing a simple execute
conn.execute('INSERT INTO Bob (Bob1, Bob2) VALUES (?, ?)', ('A', 'B'))

不幸的是得到它的工作在databricks是一个有点疼痛。 我写了一篇博客文章而回,这应有助于。 https://datathirst.net/blog/2018/10/12/executing-sql-server-stored-procedures-on-databricks-pyspark



文章来源: How to run SQL statement from Databricks cluster