Calculate totals and emit periodically in flink

2019-09-01 15:09发布

问题:

I have a stream of events about resources that looks like this:

id, type,      count
1,  view,      1
1,  download,  3
2,  view,      1
3,  view,      1
1,  download,  2
3,  view,      1

I am trying to produce stats (totals) per resource, so if I get a stream like above, the result should be:

id, views, downloads
1,  1,     5
2,  1,     0
3,  2,     0

Now I wrote a ProcessFunction that calculates the totals like this:

public class CountTotals extends ProcessFunction<Event, ResourceTotals> {
    private ValueState<ResourceTotals> totalsState;

    @Override
    public void open(Configuration config) throws Exception {
        ValueStateDescriptor<ResourceTotals> totalsDescriptor = new ValueStateDescriptor<>("totals state", ResourceTotals.class);
        totalsDescriptor.setQueryable("resource-totals");
        totalsState = getRuntimeContext().getState(totalsDescriptor);
    }

    @Override
    public void processElement(Event event, Context ctx, Collector<ResourceTotals> out) throws Exception {
        ResourceTotals totals = totalsState.value();
        if (totals == null) {
            totals = new ResourceTotals();
            totals.id = event.id;
        }
        switch (event.type) {
            case "view":
                totals.views += event.count;
                break;
            case "download":
                totals.downloads += event.count;
        }
        totalsState.update(totals);
        out.collect(totals);
    }
}

As evident from the code, it will emit a new ResourceTotals for every event, but I would like to emit a total per resource once a minute and not more often.

I tried experimenting by using a global window and a trigger (ContinuousProcessingTimeTrigger), but couldn't get it to work. The issues I had with it were:

  1. How to express that I want the last event of the window?
  2. How not to end up storing all the ResourceTotals ever produced in that global window?

Any help would be appreciated.

回答1:

You can use a timer to emit the values in totalsState once a minute. Since I don't see any timestamps in your datastream, I imagine you would use a processing time timer.

An alternative would be to replace the ProcessFunction with a TimeWindow along with a ReduceFunction that retains the last event.

In either case you could consider keying the stream by both the ID and type fields, which should simplify your state management a bit.

Updated:

Yes, timers are part of the state that gets checkpointed and restored by Flink.