Spark CosmosDB Sink: org.apache.spark.sql.Analysis

2019-07-27 18:14发布

I am reading a data stream from Event Hub in Spark (using Databricks). My goal is to be able to write the streamed data to a CosmosDB. However I get the following error: org.apache.spark.sql.AnalysisException: 'write' can not be called on streaming Dataset/DataFrame.

Is this scenario not supported?

Spark versions: 2.2.0 and 2.3.0

Libraries used:

  • json-20140107
  • rxnetty-0.4.20
  • azure-documentdb-1.14.0
  • azure-documentdb-rx-0.9.0-rc2
  • azure-cosmosdb-spark_2.2.0_2.11-1.0.0
  • rxjava-1.3.0
  • azure-eventhubs-databricks_2.11-3.4.0

My code:

import com.microsoft.azure.cosmosdb.spark._
import com.microsoft.azure.cosmosdb.spark.schema._
import com.microsoft.azure.cosmosdb.spark.streaming._
import com.microsoft.azure.cosmosdb.spark.config._

import org.apache.spark.sql.SparkSession
import org.apache.spark.eventhubs.common.EventHubsUtils
import org.apache.spark.sql
import org.apache.spark.sql.functions._
import org.apache.spark.sql.types._
import org.apache.spark.sql.streaming.Trigger

val sparkConfiguration = EventHubsUtils.initializeSparkStreamingConfigurations

sparkConfiguration.setAppName("MeetupStructuredStreaming")
sparkConfiguration.set("spark.streaming.driver.writeAheadLog.allowBatching", "true")
sparkConfiguration.set("spark.streaming.driver.writeAheadLog.batchingTimeout", "60000")
sparkConfiguration.set("spark.streaming.receiver.writeAheadLog.enable", "true")
sparkConfiguration.set("spark.streaming.driver.writeAheadLog.closeFileAfterWrite", "true")
sparkConfiguration.set("spark.streaming.receiver.writeAheadLog.closeFileAfterWrite", "true")
sparkConfiguration.set("spark.streaming.stopGracefullyOnShutdown", "true")

val spark = SparkSession.builder().config(sparkConfiguration).getOrCreate()

val configMap = Map(
  "Endpoint" -> "https://xxx.documents.azure.com:443/",
  "Masterkey" -> "xxxx",
  "Database" -> "meetup",
  "Collection" -> "event"
)

val ehStream = spark.readStream  
  .format("eventhubs")
  .option("eventhubs.policyname", "xxx")
  .option("eventhubs.policykey", "xxx")
  .option("eventhubs.namespace", "xx")
  .option("eventhubs.name", "meetup")
  .option("eventhubs.partition.count", "2")
  .option("eventhubs.maxRate", "100")
  .option("eventhubs.consumergroup", "streaming")
  .option("eventhubs.progressTrackingDir", "/tmp/eventhub-progress/meetup-events")
  .option("eventhubs.sql.containsProperties", "true")        
  .load
  .select(
    from_unixtime(col("enqueuedTime").cast(LongType)).alias("enqueuedTime")
    , from_unixtime(get_json_object(col("body").cast(StringType), "$.mtime").divide(1000)).alias("time")
    , get_json_object(col("body").cast(StringType), "$.name").alias("name")
    , get_json_object(col("body").cast(StringType), "$.event_url").alias("url")
    , get_json_object(col("body").cast(StringType), "$.status").alias("status")
    , get_json_object(col("body").cast(StringType), "$.venue.country").alias("country")
    , get_json_object(col("body").cast(StringType), "$.venue.city").alias("city")
    , get_json_object(col("body").cast(StringType), "$.group.category.shortname").alias("category"))

var cosmosDbStreamWriter = ehStream
  .writeStream
  .outputMode("append")
  .format(classOf[CosmosDBSinkProvider].getName).options(configMap)  
  .option("checkpointLocation", "/tmp/streamingCheckpoint")
  .trigger(Trigger.ProcessingTime(1000 * 3)) // every 3 seconds  
  .start()

0条回答
登录 后发表回答