I am reading a data stream from Event Hub in Spark (using Databricks). My goal is to be able to write the streamed data to a CosmosDB. However I get the following error: org.apache.spark.sql.AnalysisException: 'write' can not be called on streaming Dataset/DataFrame.
Is this scenario not supported?
Spark versions: 2.2.0 and 2.3.0
Libraries used:
- json-20140107
- rxnetty-0.4.20
- azure-documentdb-1.14.0
- azure-documentdb-rx-0.9.0-rc2
- azure-cosmosdb-spark_2.2.0_2.11-1.0.0
- rxjava-1.3.0
- azure-eventhubs-databricks_2.11-3.4.0
My code:
import com.microsoft.azure.cosmosdb.spark._
import com.microsoft.azure.cosmosdb.spark.schema._
import com.microsoft.azure.cosmosdb.spark.streaming._
import com.microsoft.azure.cosmosdb.spark.config._
import org.apache.spark.sql.SparkSession
import org.apache.spark.eventhubs.common.EventHubsUtils
import org.apache.spark.sql
import org.apache.spark.sql.functions._
import org.apache.spark.sql.types._
import org.apache.spark.sql.streaming.Trigger
val sparkConfiguration = EventHubsUtils.initializeSparkStreamingConfigurations
sparkConfiguration.setAppName("MeetupStructuredStreaming")
sparkConfiguration.set("spark.streaming.driver.writeAheadLog.allowBatching", "true")
sparkConfiguration.set("spark.streaming.driver.writeAheadLog.batchingTimeout", "60000")
sparkConfiguration.set("spark.streaming.receiver.writeAheadLog.enable", "true")
sparkConfiguration.set("spark.streaming.driver.writeAheadLog.closeFileAfterWrite", "true")
sparkConfiguration.set("spark.streaming.receiver.writeAheadLog.closeFileAfterWrite", "true")
sparkConfiguration.set("spark.streaming.stopGracefullyOnShutdown", "true")
val spark = SparkSession.builder().config(sparkConfiguration).getOrCreate()
val configMap = Map(
"Endpoint" -> "https://xxx.documents.azure.com:443/",
"Masterkey" -> "xxxx",
"Database" -> "meetup",
"Collection" -> "event"
)
val ehStream = spark.readStream
.format("eventhubs")
.option("eventhubs.policyname", "xxx")
.option("eventhubs.policykey", "xxx")
.option("eventhubs.namespace", "xx")
.option("eventhubs.name", "meetup")
.option("eventhubs.partition.count", "2")
.option("eventhubs.maxRate", "100")
.option("eventhubs.consumergroup", "streaming")
.option("eventhubs.progressTrackingDir", "/tmp/eventhub-progress/meetup-events")
.option("eventhubs.sql.containsProperties", "true")
.load
.select(
from_unixtime(col("enqueuedTime").cast(LongType)).alias("enqueuedTime")
, from_unixtime(get_json_object(col("body").cast(StringType), "$.mtime").divide(1000)).alias("time")
, get_json_object(col("body").cast(StringType), "$.name").alias("name")
, get_json_object(col("body").cast(StringType), "$.event_url").alias("url")
, get_json_object(col("body").cast(StringType), "$.status").alias("status")
, get_json_object(col("body").cast(StringType), "$.venue.country").alias("country")
, get_json_object(col("body").cast(StringType), "$.venue.city").alias("city")
, get_json_object(col("body").cast(StringType), "$.group.category.shortname").alias("category"))
var cosmosDbStreamWriter = ehStream
.writeStream
.outputMode("append")
.format(classOf[CosmosDBSinkProvider].getName).options(configMap)
.option("checkpointLocation", "/tmp/streamingCheckpoint")
.trigger(Trigger.ProcessingTime(1000 * 3)) // every 3 seconds
.start()