Why does streaming aggregation delay until two bat

2019-04-14 19:25发布

I use Spark 2.3.0.

My issue is whenever I add third batch of data in my input directory, the first batch of data getting processed and printing to console. Why?

val spark = SparkSession
  .builder()
  .appName("micro1")
  .enableHiveSupport()
  .config("hive.exec.dynamic.partition", "true")
  .config("hive.exec.dynamic.partition.mode", "nonstrict")
  .config("spark.sql.streaming.checkpointLocation", "/user/sas/sparkCheckpoint")
  .config("spark.sql.parquet.cacheMetadata","false")
  .getOrCreate()

import spark.implicits._
import org.apache.spark.sql.functions._

// Left side of a join
import org.apache.spark.sql.types._
val mySchema = new StructType()
  .add("id", IntegerType)
  .add("name", StringType)
  .add("year", IntegerType)
  .add("rating", DoubleType)
  .add("duration", IntegerType)
val xmlData = spark
  .readStream
  .option("sep", ",")
  .schema(mySchema)
  .csv("tostack")

// Right side of a join
val mappingSchema = new StructType()
  .add("id", StringType)
  .add("megavol", StringType)
val staticData = spark
  .read
  .option("sep", ",")
  .schema(mappingSchema)
  .csv("input_tost_static.csv") 

xmlData.createOrReplaceTempView("xmlupdates")
staticData.createOrReplaceTempView("mappingdata")

spark
  .sql("select * from xmlupdates a join mappingdata b on  a.id=b.id")
  .withColumn(
    "event_time",
    to_utc_timestamp(current_timestamp, Calendar.getInstance().getTimeZone().getID()))
  .withWatermark("event_time", "10 seconds")
  .groupBy(window($"event_time", "10 seconds", "10 seconds"), $"year")
  .agg(
    sum($"rating") as "rating",
    sum($"duration") as "duration",
    sum($"megavol") as "sum_megavol")
  .drop("window")
  .writeStream
  .outputMode("append")
  .format("console")
  .start

my output showing data as below: I have started the streaming first and later added data in to the particular folder. when i add my third file the first file aggregated results are getting printed. Why?

     -------------------------------------------
     Batch: 0
     -------------------------------------------
     +----+------+--------+-----------+
     |year|rating|duration|sum_megavol|
     +----+------+--------+-----------+
     +----+------+--------+-----------+

     -------------------------------------------
     Batch: 1
     -------------------------------------------
     +----+------+--------+-----------+
     |year|rating|duration|sum_megavol|
     +----+------+--------+-----------+
     +----+------+--------+-----------+

     -------------------------------------------
     Batch: 2
     -------------------------------------------
     +----+------+--------+-----------+
     |year|rating|duration|sum_megavol|
     +----+------+--------+-----------+
     |1963|   2.8|    5126|       46.0|
     |1921|   6.0|   15212|     3600.0|
     +----+------+--------+-----------+

The input data is as follows:

1,The Nightmare Before Christmas,1993,3.9,4568
2,The Mummy,1993,3.5,4388
3,Orphans of the Storm,1921,3.2,9062
4,The Object of Beauty,1921,2.8,6150
5,Night Tide,1963,2.8,5126
6,One Magic Christmas,1963,3.8,5333
7,Muriel's Wedding,1963,3.5,6323
8,Mother's Boys,1963,3.4,5733

input_tost_static.csv dataset is as follows:

3,3000
4,600
5,46

can some one help me why spark structued streaming showing this behaviour ? Do i need to add any settings here ? UPDATE : I am getting results in batch 1 itself if i try to print the val before JOIN operation... the issue is coming after joining.. its delaying morethan 3 batches....

1条回答
迷人小祖宗
2楼-- · 2019-04-14 20:08

I have started the streaming first

Batch: 0 is executed right after you started the query and given no events were streamed, no output.

At this point, the event-time watermark is not set at all.

and later added data in to the particular folder.

That could be Batch: 1.

The event-time watermark was then set to current_timestamp. In order to get any output, we have to wait "10 seconds" (according to withWatermark("event_time", "10 seconds")).

when i add my third file the first file aggregated results are getting printed. Why?

That could be Batch: 2.

I assume the next time you added new files it was after previous current_timestamp + "10 seconds" and so you got the output.

Please note that a watermark can be just 0 which means that no late data is expected.

查看更多
登录 后发表回答