I have performed a simple group by operation on year and do some aggregation as below. I tried to append the result to hdfs path as shown below. I am getting error saying,
org.apache.spark.sql.AnalysisException: Append output mode not supported
when there are streaming aggregations on streaming DataFrames/DataSets
without watermark;;
Aggregate [year#88], [year#88, sum(rating#89) AS rating#173,
sum(cast(duration#90 as bigint)) AS duration#175L]
+- EventTimeWatermark event_time#96: timestamp, interval 10 seconds
below is my code. can someone please help
val spark =SparkSession.builder().appName("mddd").
enableHiveSupport().config("hive.exec.dynamic.partition", "true").
config("hive.exec.dynamic.partition.mode", "nonstrict").
config("spark.sql.streaming.checkpointLocation", "/user/sa/sparkCheckpoint").
config("spark.debug.maxToStringFields",100).
getOrCreate()
val mySchema = StructType(Array(
StructField("id", IntegerType),
StructField("name", StringType),
StructField("year", IntegerType),
StructField("rating", DoubleType),
StructField("duration", IntegerType)
))
val xmlData = spark.readStream.option("sep", ",").schema(mySchema).csv("file:///home/sa/kafdata/")
import java.util.Calendar
val df_agg_without_time= xmlData.withColumn("event_time", to_utc_timestamp(current_timestamp, Calendar.getInstance().getTimeZone().getID()))
val df_agg_with_time = df_agg_without_time.withWatermark("event_time", "10 seconds").groupBy($"year").agg(sum($"rating").as("rating"),sum($"duration").as("duration"))
val cop = df_agg_with_time.withColumn("column_name_with", to_json(struct($"window")))
df_agg_with_time.writeStream.outputMode("append").partitionBy("year").format("csv").
option("path", "hdfs://dio/apps/hive/warehouse/gt.db/sample_mov/").start()
my input is in csv format
id,name,year,rating,duration
1,The Nightmare Before Christmas,1993,3.9,4568
2,The Mummy,1993,3.5,4388
3,Orphans of the Storm,1921,3.2,9062
4,The Object of Beauty,1921,2.8,6150
5,Night Tide,1963,2.8,5126
6,One Magic Christmas,1963,3.8,5333
7,Muriel's Wedding,1963,3.5,6323
8,Mother's Boys,1963,3.4,5733
my expected output should be in hdfs with partition on year
year,rating,duration
1993,7.4,8956
1921,6.0,15212
1963,10.7,17389
I am really not sure whats wrong with my approach. please help
This is a question with many aspects:
.enableHiveSupport().config("hive.exec.dynamic.partition", "true")
.config("hive.exec.dynamic.partition.mode", "nonstrict")
- Also, your final use case is not known. The question here is whether this is a good approach, but there is too little insight for me to assess, we simply assume it to be so.
- We are assuming that Ratings for same movie will be part of a future microbatch.
- There is a lack of an event_time in the feed, but you create it yourself. Somewhat unrealistic, but OK we can live with that, although TimeStamp is a little problematic.
- I advise you look at this blog http://blog.madhukaraphatak.com/introduction-to-spark-structured-streaming-part-12/ for an excellent assessment of Structured Streaming.
So, in general:
- Of the options Complete, Append and Update I think you have chosen the right one Append. Update could be used but I leave that out of scope.
- But did not put event_time in a Window. You should do this. I have put an example at the end here, run in Spark Shell where I could not get case class to work - that's why it took so long, but in a compiled program its is not an issue, or DataBricks.
- You cannot, functionally, write multiple queries to do the aggregation that you tried. It just produces in my case some errors.
- I would advise you use the timestamp approach I used, it is easier, as I could not test all your stuff.
Then:
- Either, write the output of this module to a KAFKA Topic and read that Topic into another module and do the second aggregation and write out, taking into account that you can get multiple movie ratings in different microbatches.
- Or, write the data out as it is including a count field, and then provide a view layer for querying that takes into account the fact that there were multiple writes.
Here is a sample using socket input and the Spark Shell - that you can extrapolate to your own data, and the output of a microbatch (note there are delays in seeing the data):
import java.sql.Timestamp
import org.apache.spark.sql.SparkSession
import org.apache.spark.sql.functions._
import org.apache.spark.sql.streaming.OutputMode
val sparkSession = SparkSession.builder
.master("local")
.appName("example")
.getOrCreate()
//create stream from socket
import sparkSession.implicits._
sparkSession.sparkContext.setLogLevel("ERROR")
val socketStreamDs = sparkSession.readStream
.format("socket")
.option("host", "localhost")
.option("port", 9999)
.load()
.as[String]
val stockDs = socketStreamDs.map(value => (value.trim.split(","))).map(entries=>(new java.sql.Timestamp(entries(0).toLong),entries(1),entries(2).toDouble)).toDF("time","symbol","value")//.toDS()
val windowedCount = stockDs
.withWatermark("time", "20000 milliseconds")
.groupBy(
window($"time", "10 seconds"),
$"symbol"
)
.agg(sum("value"), count($"symbol"))
val query =
windowedCount.writeStream
.format("console")
.option("truncate", "false")
.outputMode(OutputMode.Append())
query.start().awaitTermination()
results in:
Batch: 14
----------------------------------------------+------+----------+-------------+
|window |symbol|sum(value)|count(symbol)|
+---------------------------------------------+------+----------+-------------+
|[2016-04-27 04:34:20.0,2016-04-27 04:34:30.0]|"aap1"|4200.0 |6 |
|[2016-04-27 04:34:30.0,2016-04-27 04:34:40.0]|"app1"|800.0 |2 |
|[2016-04-27 04:34:20.0,2016-04-27 04:34:30.0]|"aap2"|2500.0 |1 |
|[2016-04-27 04:34:40.0,2016-04-27 04:34:50.0]|"app1"|2800.0 |4 |
+---------------------------------------------+------+----------+-------------+
It's quite a big topic and you need to look at it holistically.
You can see for the output that having a count could be handy in some cases, although avg output can be used to count overall avg. Success.