Spark streaming data sharing between batches

2020-05-24 19:49发布

Spark streaming processes the data in micro batches.

Each interval data is processed in parallel using RDDs with out any data sharing between each interval.

But my use case needs to share the data between intervals.

Consider the Network WordCount example which produces the count of all words received in that interval.

How would I produce following word count ?

  • Relative count for the words "hadoop" and "spark" with the previous interval count

  • Normal word count for all other words.

Note: UpdateStateByKey does the Stateful processing but this applies function on every record instead of particular records.

So, UpdateStateByKey doesn't fit for this requirement.

Update:

consider the following example

Interval-1

Input:

Sample Input with Hadoop and Spark on Hadoop

output:

hadoop  2
sample  1
input   1
with    1
and 1
spark   1
on  1

Interval-2

Input:

Another Sample Input with Hadoop and Spark on Hadoop and another hadoop another spark spark

output:

another 3
hadoop  1
spark   2
and 2
sample  1
input   1
with    1
on  1

Explanation:

1st interval gives the normal word count of all words.

In the 2nd interval hadoop occurred 3 times but the output should be 1 (3-2)

Spark occurred 3 times but the output should be 2 (3-1)

For all other words it should give the normal word count.

So, while processing 2nd Interval data, it should have the 1st interval's word count of hadoop and spark

This is a simple example with illustration.

In actual use case, fields that need data sharing are part of the RDD element(RDD) and huge no of values needs to be tracked.

i.e, in this example like hadoop and spark keywords nearly 100k keywords to be tracked.

Similar usecases in Apache Storm:

Distributed caching in storm

Storm TransactionalWords

1条回答
▲ chillily
2楼-- · 2020-05-24 19:57

This is possible by "remembering" the last RDD received and using a left join to merge that data with the next streaming batch. We make use of streamingContext.remember to enable RDDs produced by the streaming process to be kept for the time we need them.

We make use of the fact that dstream.transform is an operation that executes on the driver and therefore we have access to all local object definitions. In particular we want to update the mutable reference to the last RDD with the required value on each batch.

Probably a piece of code makes that idea more clear:

// configure the streaming context to remember the RDDs produced
// choose at least 2x the time of the streaming interval
ssc.remember(xx Seconds)  

// Initialize the "currentData" with an empty RDD of the expected type
var currentData: RDD[(String, Int)] = sparkContext.emptyRDD

// classic word count
val w1dstream = dstream.map(elem => (elem,1))    
val count = w1dstream.reduceByKey(_ + _)    

// Here's the key to make this work. Look how we update the value of the last RDD after using it. 
val diffCount = count.transform{ rdd => 
                val interestingKeys = Set("hadoop", "spark")               
                val interesting = rdd.filter{case (k,v) => interestingKeys(k)}                                
                val countDiff = rdd.leftOuterJoin(currentData).map{case (k,(v1,v2)) => (k,v1-v2.getOrElse(0))}
                currentData = interesting
                countDiff                
               }

diffCount.print()
查看更多
登录 后发表回答