How to work around DataSet.toJSON being incompatib

2019-05-21 02:39发布

问题:

I want to write data from Twitter into Kafka. For educational purposes, I try to do this using Structured Streaming. I have created a Twitter-Source, based on the socket-Source, which works well.

I set up my source as follows:

val tweets = spark
  .readStream
  .format("twitter")
  .option("query", terms)
  .load()
  .as[SparkTweet]

This gives me a nice DataSet for analytical queries. Great!

Next I want to persist each tweet in the slightly sparkified schema into Kafka:

val kafkaOutStream = tweets
  .toJSON.as("value")
  .writeStream
  .queryName("stream_to_kafka")
  .outputMode(OutputMode.Append())
  .trigger(Trigger.ProcessingTime("1 second"))
  .format("kafka")
  .option("kafka.bootstrap.servers","localhost:9092")
  .option("topic","tweets")
  .start

That's easy! Except, it doesn't work. In QueryExecution.scala the call passes into assertSupported and eventually gets thrown out, because

Exception in thread "main" org.apache.spark.sql.AnalysisException:
    Queries with streaming sources must be executed with writeStream.start();;

I didn't expect toJSON to be a pure batch-op, but without it, and using say select($"text" as "value") instead, the code will work.

Now, I'm slightly flabbergasted and would love for someone to explain why toJSON shouldn't be streaming-compatible (is it a bug? a missing feature?), and tell whether there's a Structured Streaming-way of getting a serialized representation of my object into Kafka.

回答1:

It is a bit verbose but to_json function should do the the trick:

import org.apache.spark.sql.functions.{to_json, struct, col}

tweets.select(to_json(struct(df.columns map col: _*)).alias("value"))
  .writeStream
  ...

The problem with toJSON seem to be this conversion to RDD:

val rdd: RDD[String] = queryExecution.toRdd.mapPartitions { iter =>
  ...

and (as pointed out by maasg in the comments) seems to be already resolved in the development version.