From the Flink 1.5 release announcement, we know Flink now supports "broadcast state", and it was described that "broadcast state unblocks the implementation of the “dynamic patterns” feature for Flink’s CEP library.".
Does it means currently we can use "broadcast state" to implement the “dynamic patterns” without Flink CEP ? Also I have no idea what's the difference when implementing the “dynamic patterns” for Flink CEP with or without broadcast state? I would appreciate If someone can give an example with code to explain the difference.
=============
Updating for testing broadcast data-stream by operator broadcast() with keyed-datastream
After testing in Flink 1.4.2, I found the broadcast datastream(by old operater broadcast()) can connect with keyed datastream, below is the test code, and we found all of the control stream events broadcast to all operator instance. So it seems the old broadcast() can achieve the same functionality as new "broadcast state" .
public static void ConnectBroadToKeyedStream() throws Exception {
StreamExecutionEnvironment env =
StreamExecutionEnvironment.getExecutionEnvironment().setParallelism(3);
List<Tuple1<String>>
controlData = new ArrayList<Tuple1<String>>();
controlData.add(new Tuple1<String>("DROP"));
controlData.add(new Tuple1<String>("IGNORE"));
DataStream<Tuple1<String>> control = env.fromCollection(controlData);//.keyBy(0);
List<Tuple1<String>>
dataStreamData = new ArrayList<Tuple1<String>>();
dataStreamData.add(new Tuple1<String>("data"));
dataStreamData.add(new Tuple1<String>("DROP"));
dataStreamData.add(new Tuple1<String>("artisans"));
dataStreamData.add(new Tuple1<String>("IGNORE"));
dataStreamData.add(new Tuple1<String>("IGNORE"));
dataStreamData.add(new Tuple1<String>("IGNORE"));
dataStreamData.add(new Tuple1<String>("IGNORE"));
// DataStream<String> data2 = env.fromElements("data", "DROP", "artisans", "IGNORE");
DataStream<Tuple1<String>> keyedDataStream = env.fromCollection(dataStreamData).keyBy(0);
DataStream<String> result = control
.broadcast()
.connect(keyedDataStream)
.flatMap(new MyCoFlatMap());
result.print();
env.execute();
}
private static final class MyCoFlatMap
implements CoFlatMapFunction<Tuple1<String>, Tuple1<String>, String> {
HashSet blacklist = new HashSet();
@Override
public void flatMap1(Tuple1<String> control_value, Collector<String> out) {
blacklist.add(control_value);
out.collect("listed " + control_value);
}
@Override
public void flatMap2(Tuple1<String> data_value, Collector<String> out) {
if (blacklist.contains(data_value)) {
out.collect("skipped " + data_value);
} else {
out.collect("passed " + data_value);
}
}
}
Below is the test result.
1> passed (data)
1> passed (DROP)
3> passed (artisans)
3> passed (IGNORE)
3> passed (IGNORE)
3> passed (IGNORE)
3> passed (IGNORE)
3> listed (DROP)
3> listed (IGNORE)
1> listed (DROP)
1> listed (IGNORE)
2> listed (DROP)
2> listed (IGNORE)
https://data-artisans.com/blog/apache-flink-1-5-0-release-announcement
Here's a code sample which implements both the flink original broadcast method with no arguments and newly introduced broadcast state on flink 1.5.0. https://gist.github.com/syhily/932e0d1e0f12b3e951236d6c36e5a7ed
As far as I have learned, the broadcast state could be implemented without flink cep, just like the code showing above.
The original
DataStream
'sbroadcast
method would create aDataStream
instead of aBroadcastConnectedStream
. This would be the original coGroup design scheme. We could use more stream transform function defined inConnectedStreams
after connecting the metrics stream with a broadcasted rule stream. Such askeyBy
function, this would make the broadcasted stream and connected stream which have same key beprocess
ed and sticked on the same parallelledCoProcessFunction
. So theCoProcessFunction
could have its own local storage. The process function could have a custom data structure on its field other than a map state accessed fromReadOnlyContext
.Broadcast state could be implemented by a
broadcast
method with a set ofMapStateDescriptor
, this means the broadcasted stream could be connected with other stream many times. Different connectedBroadcastConnectedStream
could share its own broadcast state with a uniqueMapStateDescriptor
inprocess
function.I thought these would be the key differences between broadcast with on arguments and broadcast state.
Without broadcast state, two Flink data streams can not be processed together in a stateful way unless they are keyed in exactly the same way. A broadcast stream can be connected to a keyed stream, but if you then try to use keyed state in a RichCoFlatMap, for example, that will fail.
What is frequently desired is to be able to have one stream with dynamic "rules" that are to be applied to every event on another stream, regardless of key. There needed to be a new kind of managed Flink state in which these rules could be stored. With broadcast state this can now be done in a straightforward way.
With this feature now in place, work on support for dynamic patterns in CEP can begin.