Calculate totals and emit periodically in flink

2019-09-01 15:09发布


I have a stream of events about resources that looks like this:

id, type,      count
1,  view,      1
1,  download,  3
2,  view,      1
3,  view,      1
1,  download,  2
3,  view,      1

I am trying to produce stats (totals) per resource, so if I get a stream like above, the result should be:

id, views, downloads
1,  1,     5
2,  1,     0
3,  2,     0

Now I wrote a ProcessFunction that calculates the totals like this:

public class CountTotals extends ProcessFunction<Event, ResourceTotals> {
    private ValueState<ResourceTotals> totalsState;

    public void open(Configuration config) throws Exception {
        ValueStateDescriptor<ResourceTotals> totalsDescriptor = new ValueStateDescriptor<>("totals state", ResourceTotals.class);
        totalsState = getRuntimeContext().getState(totalsDescriptor);

    public void processElement(Event event, Context ctx, Collector<ResourceTotals> out) throws Exception {
        ResourceTotals totals = totalsState.value();
        if (totals == null) {
            totals = new ResourceTotals();
        switch (event.type) {
            case "view":
                totals.views += event.count;
            case "download":
                totals.downloads += event.count;

As evident from the code, it will emit a new ResourceTotals for every event, but I would like to emit a total per resource once a minute and not more often.

I tried experimenting by using a global window and a trigger (ContinuousProcessingTimeTrigger), but couldn't get it to work. The issues I had with it were:

  1. How to express that I want the last event of the window?
  2. How not to end up storing all the ResourceTotals ever produced in that global window?

Any help would be appreciated.


You can use a timer to emit the values in totalsState once a minute. Since I don't see any timestamps in your datastream, I imagine you would use a processing time timer.

An alternative would be to replace the ProcessFunction with a TimeWindow along with a ReduceFunction that retains the last event.

In either case you could consider keying the stream by both the ID and type fields, which should simplify your state management a bit.


Yes, timers are part of the state that gets checkpointed and restored by Flink.