I have a stream of events about resources that looks like this:
id, type, count
1, view, 1
1, download, 3
2, view, 1
3, view, 1
1, download, 2
3, view, 1
I am trying to produce stats (totals) per resource, so if I get a stream like above, the result should be:
id, views, downloads
1, 1, 5
2, 1, 0
3, 2, 0
Now I wrote a ProcessFunction that calculates the totals like this:
public class CountTotals extends ProcessFunction<Event, ResourceTotals> {
private ValueState<ResourceTotals> totalsState;
@Override
public void open(Configuration config) throws Exception {
ValueStateDescriptor<ResourceTotals> totalsDescriptor = new ValueStateDescriptor<>("totals state", ResourceTotals.class);
totalsDescriptor.setQueryable("resource-totals");
totalsState = getRuntimeContext().getState(totalsDescriptor);
}
@Override
public void processElement(Event event, Context ctx, Collector<ResourceTotals> out) throws Exception {
ResourceTotals totals = totalsState.value();
if (totals == null) {
totals = new ResourceTotals();
totals.id = event.id;
}
switch (event.type) {
case "view":
totals.views += event.count;
break;
case "download":
totals.downloads += event.count;
}
totalsState.update(totals);
out.collect(totals);
}
}
As evident from the code, it will emit a new ResourceTotals for every event, but I would like to emit a total per resource once a minute and not more often.
I tried experimenting by using a global window and a trigger (ContinuousProcessingTimeTrigger), but couldn't get it to work. The issues I had with it were:
- How to express that I want the last event of the window?
- How not to end up storing all the ResourceTotals ever produced in that global window?
Any help would be appreciated.