Queries with streaming sources must be executed wi

2020-06-08 15:33发布

问题:

I'm trying to read the messages from kafka (version 10) in spark and trying to print it.

     import spark.implicits._

         val spark = SparkSession
              .builder
              .appName("StructuredNetworkWordCount")
              .config("spark.master", "local")
              .getOrCreate()  

            val ds1 = spark.readStream.format("kafka")
              .option("kafka.bootstrap.servers", "localhost:9092")  
              .option("subscribe", "topicA")
              .load()

           ds1.collect.foreach(println)
           ds1.writeStream
           .format("console")
           .start()

           ds1.printSchema()

getting an error Exception in thread "main"

org.apache.spark.sql.AnalysisException: Queries with streaming sources must be executed with writeStream.start();;

回答1:

You are branching the query plan: from the same ds1 you are trying to:

  • ds1.collect.foreach(...)
  • ds1.writeStream.format(...){...}

But you are only calling .start() on the second branch, leaving the other dangling without a termination, which in turn throws the exception you are getting back.

The solution is to start both branches and await termination.

val ds1 = spark.readStream.format("kafka")
  .option("kafka.bootstrap.servers", "localhost:9092")  
  .option("subscribe", "topicA")  
  .load()
val query1 = ds1.collect.foreach(println)
  .writeStream
  .format("console")
  .start()
val query2 = ds1.writeStream
  .format("console")
  .start()

ds1.printSchema()
query1.awaitTermination()
query2.awaitTermination()


回答2:

i fixed issue by using following code.

 val df = session
  .readStream
  .format("kafka")
  .option("kafka.bootstrap.servers", brokers)
  .option("subscribe", "streamTest2")
  .load();

    val query = df.writeStream
  .outputMode("append")
  .format("console")
  .start()
query.awaitTermination()


回答3:

I struggled a lot with this issue. I tried each of suggested solution from various blog. But I my case there are few statement in between calling start() on query and finally at last i was calling awaitTerminate() function that cause this.

Please try in this fashion, It is perfectly working for me. Working example:

val query = df.writeStream
      .outputMode("append")
      .format("console")
      .start().awaitTermination();

If you write in this way that will cause exception/ error:

val query = df.writeStream
      .outputMode("append")
      .format("console")
      .start()

    // some statement 
    // some statement 

    query.awaitTermination();

will throw given exception and will close your streaming driver.



回答4:

Kindly remove ds1.collect.foreach(println) and ds1.printSchema() , use outputMode and awaitAnyTermination for background process Waiting until any of the queries on the associated spark.streams has terminated

val spark = SparkSession
    .builder
    .appName("StructuredNetworkWordCount")
    .config("spark.master", "local[*]")
    .getOrCreate()

  val ds1 = spark.readStream.format("kafka")
    .option("kafka.bootstrap.servers", "localhost:9092")
    .option("subscribe", "topicA")  .load()

  val consoleOutput1 = ds1.writeStream
     .outputMode("update")
     .format("console")
     .start()

  spark.streams.awaitAnyTermination()

|key|value|topic|partition|offset|
+---+-----+-----+---------+------+
+---+-----+-----+---------+------+


回答5:

I was able to resolves this issue by following code. In my scenario, I had multiple intermediate Dataframes, which were basically the transformations made on the inputDF.

 val query = joinedDF
      .writeStream
      .format("console")
      .option("truncate", "false")
      .outputMode(OutputMode.Complete())
      .start()
      .awaitTermination()

joinedDF is the result of the last transformation performed.