How to assign groups of messages to windows by det

2019-08-16 06:27发布

问题:

I have the following problem: I receive messages which have to be grouped and each group of messages has to be processed. I can only detect the first message of each group. After that specific first message, the following messages belong to that group until the first message of the next group has been detected.

My approach to solving that problem was to write a custom trigger that returns FIRE_PURGE when he detects the first message of a group (by overriding onElement). My goal was to assign all messages of one group to one window.

The problem with that approach is that the first message of each group is always assigned to the window of the preceding group.

What i get is: [aaaaaaab], [bbbbbbbbc] ... What i want is: [aaaaaaa], [bbbbbbbb] ...

Relevant code from the main function:

            esRawInputStream.filter(new FilterFunction<JsonNode>() {
                @Override
                public boolean filter(JsonNode doc) throws Exception {
                    return // some condition
                }
            }).keyBy(new KeySelector<JsonNode, String>() {
                @Override
                public String getKey(JsonNode doc) throws Exception {
                    return doc.findValue("meta_charge_point_id").asText();
                }
            }).window(GlobalWindows.create())
                    .trigger(new CustomEventTrigger<JsonNode, GlobalWindow>())
                    .fold(new SessionBucket(), new FoldFunction<JsonNode, SessionBucket>() {
                        @Override
                        public SessionBucket fold(SessionBucket b, JsonNode msg) throws Exception {
                            b.addMessage(msg);
                            return b;
                        }
                    }).addSink(new FileSink<SessionBucket>());

The trigger:

public class CustomEventTrigger<T, W extends Window> extends Trigger {
    private String currentSessionId = "foo";

    @Override
    public TriggerResult onElement(Object element, long timestamp, Window window, TriggerContext ctx) throws Exception {
        JsonNode jsonElement = null;
        if (element instanceof JsonNode) {
            jsonElement = (JsonNode) element;

        } else {
            // raise
        }
        TriggerResult res = TriggerResult.CONTINUE;
        String elementSessionId = jsonElement.findValue("ocpp_session_id").asText();
        if (!elementSessionId.equals(currentSessionId)) {
            currentSessionId = elementSessionId;
            res = TriggerResult.FIRE_AND_PURGE;
        }
        return res;
    }

    @Override
    public TriggerResult onProcessingTime(long time, Window window, TriggerContext ctx) throws Exception {
        return null;
    }

    @Override
    public TriggerResult onEventTime(long time, Window window, TriggerContext ctx) throws Exception {
        return null;
    }

    @Override
    public void clear(Window window, TriggerContext ctx) throws Exception {

    }
} 

回答1:

This use case isn't very well suited to Flink's window API. Let me suggest an alternative, which is to do this with a stateful flatmap function.

Here's an example of what that might look like:

public class Segmenting {
    public static void main(String[] args) throws Exception {
        StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
        env.setParallelism(1);

        env.fromElements(1, 2, 2, 3, 3, 3, 1, 4, 4, 4, 4, 2, 2)
            // key the stream so we can used keyed state
            .keyBy(event -> 1)
            .flatMap(new RichFlatMapFunction<Integer, List<Integer>>() {
                private transient ValueState<Integer> currentValue;
                private transient ListState<Integer> list;

                @Override
                public void open(Configuration parameters) throws Exception {
                    currentValue = getRuntimeContext().getState(new ValueStateDescriptor<>("currentValue", Integer.class));
                    list = getRuntimeContext().getListState(new ListStateDescriptor<>("list", Integer.class));
                }

                @Override
                public void flatMap(Integer event, Collector<List<Integer>> collector) throws Exception {
                    Integer value = currentValue.value();

                    if (value == event) {
                        list.add(event);
                    } else {
                        if (value != null) {
                            List<Integer> result = new ArrayList<>();
                            list.get().forEach(result::add);
                            collector.collect(result);
                        }
                        currentValue.update(event);
                        list.clear();
                        list.add(event);
                    }
                }
            })
            .print();

        env.execute();
    }
}

The output is

[1]
[2, 2]
[3, 3, 3]
[1]
[4, 4, 4, 4]

By the way, I'm assuming the data is in order, and am avoiding parallel processing so as to keep it in order. For most stream processing applications that would be an unrealistic assumption. If your data will be out-of-order, you can use this as a starting point, but the final solution will be more complex.