How to filter Apache flink stream on the basis of

2020-07-27 04:13发布

I have two stream one is of Int and other is of json .In The json Schema there is one key which is some int .So i need to filter the json stream via key comparison with the other integer stream so Is it possible in Flink?

1条回答
欢心
2楼-- · 2020-07-27 04:57

Yes, you can do this kind of stream processing with Flink. The basic building blocks you need from Flink are connected streams, and stateful functions -- here's an example using a RichCoFlatMap:

import org.apache.flink.api.common.state.ValueState;
import org.apache.flink.api.common.state.ValueStateDescriptor;
import org.apache.flink.api.common.typeinfo.TypeHint;
import org.apache.flink.api.common.typeinfo.TypeInformation;
import org.apache.flink.configuration.Configuration;
import org.apache.flink.streaming.api.datastream.DataStream;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import org.apache.flink.streaming.api.functions.co.RichCoFlatMapFunction;
import org.apache.flink.util.Collector;

public class Connect {
    public static void main(String[] args) throws Exception {
        final StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();

        DataStream<Event> control = env.fromElements(
                new Event(17),
                new Event(42))
                .keyBy("key");

        DataStream<Event> data = env.fromElements(
                new Event(2),
                new Event(42),
                new Event(6),
                new Event(17),
                new Event(8),
                new Event(42)
                )
                .keyBy("key");

        DataStream<Event> result = control
                .connect(data)
                .flatMap(new MyConnectedStreams());

        result.print();

        env.execute();
    }

    static final class MyConnectedStreams
            extends RichCoFlatMapFunction<Event, Event, Event> {

        private ValueState<Boolean> seen = null;

        @Override
        public void open(Configuration config) {
            ValueStateDescriptor<Boolean> descriptor = new ValueStateDescriptor<>(
                    // state name
                    "have-seen-key",
                    // type information of state
                    TypeInformation.of(new TypeHint<Boolean>() {
                    }));
            seen = getRuntimeContext().getState(descriptor);
        }

        @Override
        public void flatMap1(Event control, Collector<Event> out) throws Exception {
            seen.update(Boolean.TRUE);
        }

        @Override
        public void flatMap2(Event data, Collector<Event> out) throws Exception {
            if (seen.value() == Boolean.TRUE) {
                out.collect(data);
            }
        }
    }


    public static final class Event {
        public Event() {
        }

        public Event(int key) {
            this.key = key;
        }

        public int key;

        public String toString() {
            return String.valueOf(key);
        }
    }
}

In this example, only those keys that have been seen on the control stream are passed through the data stream -- all other events are filtered out. I've taken advantage of Flink's managed keyed state and connected streams.

To keep this simple I've ignored your requirement that the data stream has JSON, but you can find examples of how to work with JSON and Flink elsewhere.

Note that your results will be non-deterministic, since you have no control over the timing of the two streams relative to one another. You could manage this by adding event-time timestamps to the streams, and then using a RichCoProcessFunction instead.

查看更多
登录 后发表回答