What does it mean that “broadcast state” unblocks

2019-06-11 01:09发布

问题:

From the Flink 1.5 release announcement, we know Flink now supports "broadcast state", and it was described that "broadcast state unblocks the implementation of the “dynamic patterns” feature for Flink’s CEP library.".

Does it means currently we can use "broadcast state" to implement the “dynamic patterns” without Flink CEP ? Also I have no idea what's the difference when implementing the “dynamic patterns” for Flink CEP with or without broadcast state? I would appreciate If someone can give an example with code to explain the difference.

=============

Updating for testing broadcast data-stream by operator broadcast() with keyed-datastream

After testing in Flink 1.4.2, I found the broadcast datastream(by old operater broadcast()) can connect with keyed datastream, below is the test code, and we found all of the control stream events broadcast to all operator instance. So it seems the old broadcast() can achieve the same functionality as new "broadcast state" .

public static void ConnectBroadToKeyedStream() throws Exception {
    StreamExecutionEnvironment env =
            StreamExecutionEnvironment.getExecutionEnvironment().setParallelism(3);

    List<Tuple1<String>>
            controlData = new ArrayList<Tuple1<String>>();
    controlData.add(new Tuple1<String>("DROP"));
    controlData.add(new Tuple1<String>("IGNORE"));
    DataStream<Tuple1<String>> control = env.fromCollection(controlData);//.keyBy(0);

    List<Tuple1<String>>
            dataStreamData = new ArrayList<Tuple1<String>>();
    dataStreamData.add(new Tuple1<String>("data"));
    dataStreamData.add(new Tuple1<String>("DROP"));
    dataStreamData.add(new Tuple1<String>("artisans"));
    dataStreamData.add(new Tuple1<String>("IGNORE"));
    dataStreamData.add(new Tuple1<String>("IGNORE"));
    dataStreamData.add(new Tuple1<String>("IGNORE"));
    dataStreamData.add(new Tuple1<String>("IGNORE"));

    // DataStream<String> data2 = env.fromElements("data", "DROP", "artisans", "IGNORE");
    DataStream<Tuple1<String>> keyedDataStream = env.fromCollection(dataStreamData).keyBy(0);

    DataStream<String> result = control
            .broadcast()
            .connect(keyedDataStream)
            .flatMap(new MyCoFlatMap());
    result.print();
    env.execute();
}

private static final class MyCoFlatMap
        implements CoFlatMapFunction<Tuple1<String>, Tuple1<String>, String> {
    HashSet blacklist = new HashSet();

    @Override
    public void flatMap1(Tuple1<String> control_value, Collector<String> out) {
        blacklist.add(control_value);
        out.collect("listed " + control_value);
    }

    @Override
    public void flatMap2(Tuple1<String> data_value, Collector<String> out) {

        if (blacklist.contains(data_value)) {
            out.collect("skipped " + data_value);
        } else {
            out.collect("passed " + data_value);
        }
    }
}

Below is the test result.

1> passed (data)
1> passed (DROP)
3> passed (artisans)
3> passed (IGNORE)
3> passed (IGNORE)
3> passed (IGNORE)
3> passed (IGNORE)
3> listed (DROP)
3> listed (IGNORE)
1> listed (DROP)
1> listed (IGNORE)
2> listed (DROP)
2> listed (IGNORE)

https://data-artisans.com/blog/apache-flink-1-5-0-release-announcement

回答1:

Without broadcast state, two Flink data streams can not be processed together in a stateful way unless they are keyed in exactly the same way. A broadcast stream can be connected to a keyed stream, but if you then try to use keyed state in a RichCoFlatMap, for example, that will fail.

What is frequently desired is to be able to have one stream with dynamic "rules" that are to be applied to every event on another stream, regardless of key. There needed to be a new kind of managed Flink state in which these rules could be stored. With broadcast state this can now be done in a straightforward way.

With this feature now in place, work on support for dynamic patterns in CEP can begin.



回答2:

Here's a code sample which implements both the flink original broadcast method with no arguments and newly introduced broadcast state on flink 1.5.0. https://gist.github.com/syhily/932e0d1e0f12b3e951236d6c36e5a7ed

As far as I have learned, the broadcast state could be implemented without flink cep, just like the code showing above.

The original DataStream's broadcast method would create a DataStream instead of a BroadcastConnectedStream. This would be the original coGroup design scheme. We could use more stream transform function defined in ConnectedStreams after connecting the metrics stream with a broadcasted rule stream. Such as keyBy function, this would make the broadcasted stream and connected stream which have same key be processed and sticked on the same parallelled CoProcessFunction. So the CoProcessFunction could have its own local storage. The process function could have a custom data structure on its field other than a map state accessed from ReadOnlyContext.

Broadcast state could be implemented by a broadcast method with a set of MapStateDescriptor, this means the broadcasted stream could be connected with other stream many times. Different connected BroadcastConnectedStream could share its own broadcast state with a unique MapStateDescriptor in process function.

I thought these would be the key differences between broadcast with on arguments and broadcast state.