spark structured streaming exception : Append outp

2020-03-26 12:14发布

问题:

I have performed a simple group by operation on year and do some aggregation as below. I tried to append the result to hdfs path as shown below. I am getting error saying,

   org.apache.spark.sql.AnalysisException: Append output mode not supported 
   when there are streaming aggregations on streaming DataFrames/DataSets 
   without watermark;;
   Aggregate [year#88], [year#88, sum(rating#89) AS rating#173, 
   sum(cast(duration#90 as bigint)) AS duration#175L]
   +- EventTimeWatermark event_time#96: timestamp, interval 10 seconds

below is my code. can someone please help

    val spark =SparkSession.builder().appName("mddd").
    enableHiveSupport().config("hive.exec.dynamic.partition", "true").
    config("hive.exec.dynamic.partition.mode", "nonstrict").
    config("spark.sql.streaming.checkpointLocation", "/user/sa/sparkCheckpoint").
    config("spark.debug.maxToStringFields",100).
    getOrCreate()

    val mySchema = StructType(Array(
     StructField("id", IntegerType),
     StructField("name", StringType),
     StructField("year", IntegerType),
     StructField("rating", DoubleType),
     StructField("duration", IntegerType)
    ))

    val xmlData = spark.readStream.option("sep", ",").schema(mySchema).csv("file:///home/sa/kafdata/") 
    import java.util.Calendar
    val df_agg_without_time= xmlData.withColumn("event_time", to_utc_timestamp(current_timestamp, Calendar.getInstance().getTimeZone().getID()))

    val df_agg_with_time = df_agg_without_time.withWatermark("event_time", "10 seconds").groupBy($"year").agg(sum($"rating").as("rating"),sum($"duration").as("duration"))
    val cop = df_agg_with_time.withColumn("column_name_with", to_json(struct($"window")))

    df_agg_with_time.writeStream.outputMode("append").partitionBy("year").format("csv").
    option("path", "hdfs://dio/apps/hive/warehouse/gt.db/sample_mov/").start()

my input is in csv format

    id,name,year,rating,duration
    1,The Nightmare Before Christmas,1993,3.9,4568
    2,The Mummy,1993,3.5,4388
    3,Orphans of the Storm,1921,3.2,9062
    4,The Object of Beauty,1921,2.8,6150
    5,Night Tide,1963,2.8,5126
    6,One Magic Christmas,1963,3.8,5333
    7,Muriel's Wedding,1963,3.5,6323
    8,Mother's Boys,1963,3.4,5733

my expected output should be in hdfs with partition on year

    year,rating,duration
    1993,7.4,8956
    1921,6.0,15212
    1963,10.7,17389

I am really not sure whats wrong with my approach. please help

回答1:

This is a question with many aspects:

  • The Structured Streaming API has limitations imho.
  • One can pipeline multiple queries and technically it runs, but produces no output, so it is of no value to do that - and it cannot perform such other functionality even though you can specify it.
  • The manual states: withWatermark must be called on the same column as the timestamp column used in the aggregate.

    For example, df.withWatermark("time", "1 min").groupBy("time2").count() is invalid in Append output mode, as watermark is defined on a different column from the aggregation column. Simply stated, for Append you need WaterMark. I think you have an issue here.

  • Is the following relavant when using path?

  .enableHiveSupport().config("hive.exec.dynamic.partition", "true")
  .config("hive.exec.dynamic.partition.mode", "nonstrict")
  • Also, your final use case is not known. The question here is whether this is a good approach, but there is too little insight for me to assess, we simply assume it to be so.
  • We are assuming that Ratings for same movie will be part of a future microbatch.
  • There is a lack of an event_time in the feed, but you create it yourself. Somewhat unrealistic, but OK we can live with that, although TimeStamp is a little problematic.
  • I advise you look at this blog http://blog.madhukaraphatak.com/introduction-to-spark-structured-streaming-part-12/ for an excellent assessment of Structured Streaming.

So, in general:

  • Of the options Complete, Append and Update I think you have chosen the right one Append. Update could be used but I leave that out of scope.
  • But did not put event_time in a Window. You should do this. I have put an example at the end here, run in Spark Shell where I could not get case class to work - that's why it took so long, but in a compiled program its is not an issue, or DataBricks.
  • You cannot, functionally, write multiple queries to do the aggregation that you tried. It just produces in my case some errors.
  • I would advise you use the timestamp approach I used, it is easier, as I could not test all your stuff.

Then:

  • Either, write the output of this module to a KAFKA Topic and read that Topic into another module and do the second aggregation and write out, taking into account that you can get multiple movie ratings in different microbatches.
  • Or, write the data out as it is including a count field, and then provide a view layer for querying that takes into account the fact that there were multiple writes.

Here is a sample using socket input and the Spark Shell - that you can extrapolate to your own data, and the output of a microbatch (note there are delays in seeing the data):

import java.sql.Timestamp
import org.apache.spark.sql.SparkSession
import org.apache.spark.sql.functions._
import org.apache.spark.sql.streaming.OutputMode

val sparkSession = SparkSession.builder
  .master("local")
  .appName("example")
  .getOrCreate()
//create stream from socket

import sparkSession.implicits._
sparkSession.sparkContext.setLogLevel("ERROR")
val socketStreamDs = sparkSession.readStream
  .format("socket")
  .option("host", "localhost")
  .option("port", 9999)
  .load()
  .as[String]

val stockDs = socketStreamDs.map(value => (value.trim.split(","))).map(entries=>(new java.sql.Timestamp(entries(0).toLong),entries(1),entries(2).toDouble)).toDF("time","symbol","value")//.toDS() 

val windowedCount = stockDs
  .withWatermark("time", "20000 milliseconds")
  .groupBy( 
    window($"time", "10 seconds"),
           $"symbol" 
  )
  .agg(sum("value"), count($"symbol"))

val query =
  windowedCount.writeStream
    .format("console")
    .option("truncate", "false")
    .outputMode(OutputMode.Append())

query.start().awaitTermination()

results in:

Batch: 14
----------------------------------------------+------+----------+-------------+  
|window                                       |symbol|sum(value)|count(symbol)|
+---------------------------------------------+------+----------+-------------+
|[2016-04-27 04:34:20.0,2016-04-27 04:34:30.0]|"aap1"|4200.0    |6            |
|[2016-04-27 04:34:30.0,2016-04-27 04:34:40.0]|"app1"|800.0     |2            |
|[2016-04-27 04:34:20.0,2016-04-27 04:34:30.0]|"aap2"|2500.0    |1            |
|[2016-04-27 04:34:40.0,2016-04-27 04:34:50.0]|"app1"|2800.0    |4            |
+---------------------------------------------+------+----------+-------------+

It's quite a big topic and you need to look at it holistically.

You can see for the output that having a count could be handy in some cases, although avg output can be used to count overall avg. Success.