I have the following problem: I receive messages which have to be grouped and each group of messages has to be processed. I can only detect the first message of each group. After that specific first message, the following messages belong to that group until the first message of the next group has been detected.
My approach to solving that problem was to write a custom trigger that returns FIRE_PURGE when he detects the first message of a group (by overriding onElement). My goal was to assign all messages of one group to one window.
The problem with that approach is that the first message of each group is always assigned to the window of the preceding group.
What i get is: [aaaaaaab], [bbbbbbbbc] ... What i want is: [aaaaaaa], [bbbbbbbb] ...
Relevant code from the main function:
esRawInputStream.filter(new FilterFunction<JsonNode>() {
@Override
public boolean filter(JsonNode doc) throws Exception {
return // some condition
}
}).keyBy(new KeySelector<JsonNode, String>() {
@Override
public String getKey(JsonNode doc) throws Exception {
return doc.findValue("meta_charge_point_id").asText();
}
}).window(GlobalWindows.create())
.trigger(new CustomEventTrigger<JsonNode, GlobalWindow>())
.fold(new SessionBucket(), new FoldFunction<JsonNode, SessionBucket>() {
@Override
public SessionBucket fold(SessionBucket b, JsonNode msg) throws Exception {
b.addMessage(msg);
return b;
}
}).addSink(new FileSink<SessionBucket>());
The trigger:
public class CustomEventTrigger<T, W extends Window> extends Trigger {
private String currentSessionId = "foo";
@Override
public TriggerResult onElement(Object element, long timestamp, Window window, TriggerContext ctx) throws Exception {
JsonNode jsonElement = null;
if (element instanceof JsonNode) {
jsonElement = (JsonNode) element;
} else {
// raise
}
TriggerResult res = TriggerResult.CONTINUE;
String elementSessionId = jsonElement.findValue("ocpp_session_id").asText();
if (!elementSessionId.equals(currentSessionId)) {
currentSessionId = elementSessionId;
res = TriggerResult.FIRE_AND_PURGE;
}
return res;
}
@Override
public TriggerResult onProcessingTime(long time, Window window, TriggerContext ctx) throws Exception {
return null;
}
@Override
public TriggerResult onEventTime(long time, Window window, TriggerContext ctx) throws Exception {
return null;
}
@Override
public void clear(Window window, TriggerContext ctx) throws Exception {
}
}