I want to write data from Twitter into Kafka. For educational purposes, I try to do this using Structured Streaming. I have created a Twitter-Source, based on the socket-Source, which works well.
I set up my source as follows:
val tweets = spark
.readStream
.format("twitter")
.option("query", terms)
.load()
.as[SparkTweet]
This gives me a nice DataSet for analytical queries. Great!
Next I want to persist each tweet in the slightly sparkified schema into Kafka:
val kafkaOutStream = tweets
.toJSON.as("value")
.writeStream
.queryName("stream_to_kafka")
.outputMode(OutputMode.Append())
.trigger(Trigger.ProcessingTime("1 second"))
.format("kafka")
.option("kafka.bootstrap.servers","localhost:9092")
.option("topic","tweets")
.start
That's easy! Except, it doesn't work. In QueryExecution.scala
the call passes into assertSupported
and eventually gets thrown out, because
Exception in thread "main" org.apache.spark.sql.AnalysisException:
Queries with streaming sources must be executed with writeStream.start();;
I didn't expect toJSON
to be a pure batch-op, but without it, and using say select($"text" as "value")
instead, the code will work.
Now, I'm slightly flabbergasted and would love for someone to explain why toJSON shouldn't be streaming-compatible (is it a bug? a missing feature?), and tell whether there's a Structured Streaming-way of getting a serialized representation of my object into Kafka.