Here is a simple code example to illustrate my question:
case class Record( key: String, value: Int )
object Job extends App
{
val env = StreamExecutionEnvironment.getExecutionEnvironment
val data = env.fromElements( Record("01",1), Record("02",2), Record("03",3), Record("04",4), Record("05",5) )
val step1 = data.filter( record => record.value % 3 != 0 ) // introduces some data loss
val step2 = data.map( r => Record( r.key, r.value * 2 ) )
val step3 = data.map( r => Record( r.key, r.value * 3 ) )
val merged = step1.union( step2, step3 )
val keyed = merged.keyBy(0)
val windowed = keyed.countWindow( 3 )
val summed = windowed.sum( 1 )
summed.print()
env.execute("test")
}
This produces the following result:
Record(01,6)
Record(02,12)
Record(04,24)
Record(05,30)
As expected, no result is produced for key "03" because the count window expects 3 elements and only two are present in the stream.
What I would like is some kind of count window with timeout so that, after a certain timeout, if the number of elements expected by the count window is not reached, a partial result is produced with the existing elements.
With this behavior, in my example, a Record(03,15) would be produced when the timeout is reached.
I have followed both David's and NIrav's approaches and here are the results.
1) Using a custom trigger:
Here I have reversed my initial logic. Instead of using a 'count window', I use a 'time window' with a duration corresponding to the timeout and followed by a trigger that fires when all the elements have been processed.
And here is the trigger code:
2) Using a process function:
With all the logic (i.e., windowing, triggering, and summing) going into the function:
I think you can implement this use case using ProcessFunction
In which you have count property and windowEnd property. Using that you can decide when to collect the data.
I hope this will helpful to you.
You could also do this with a custom window
Trigger
that fires either when the count has been reached or when the timeout expires -- effectively blending the built-inCountTrigger
andEventTimeTrigger
.