弗林克复杂事件处理(Flink Complex Event Processing)

2019-09-27 17:50发布

我有一个弗林克CEP代码从套接字读取和检测一个模式。 比方说模式(字)是“警报”。 如果发生的话警告五次或以上,应创建一个警报。 但我得到的输入不匹配错误。 弗林克的版本是1.3.0。 提前致谢 !!

package pattern;

import org.apache.flink.cep.CEP;
import org.apache.flink.cep.PatternStream;
import org.apache.flink.cep.pattern.Pattern;
import org.apache.flink.cep.pattern.conditions.IterativeCondition;
import org.apache.flink.streaming.api.datastream.DataStream;
import org.apache.flink.streaming.api.datastream.DataStreamSource;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import org.apache.flink.util.Collector;

import java.util.List;
import java.util.Map;

    public class cep {

         public static void main(String[] args) throws Exception {


             StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();

                DataStreamSource<String> dss = env.socketTextStream("localhost", 3005);

                dss.print();

                Pattern<String,String> pattern = Pattern.<String> begin("first")
                        .where(new IterativeCondition<String>() {
                            @Override
                            public boolean filter(String word, Context<String> context) throws Exception {
                                return word.equals("alert");
                            }
                        })
                        .times(5);


                PatternStream<String> patternstream = CEP.pattern(dss, pattern);

                DataStream<String> alerts = patternstream
                        .flatSelect((Map<String,List<String>> in, Collector<String> out) -> {

                            String first = in.get("first").get(0);

                            for (int i = 0; i < 6; i++ ) {

                                out.collect(first);

                            }


                        });

                alerts.print();

                env.execute();

            }

    }

Answer 1:

只是一些澄清原来的问题。 在1.3.0有使用lambda表达式作为参数传递给那个做了一个错误select/flatSelect不可能的。

它被固定在1.3.1,所以你的代码的第一个版本将与1.3.1工作。


此外,我认为你曲解的times量词。 它匹配的次数确切数字。 因此,在你的情况下,它只会返回时,事件将被精确匹配的3倍,而不是3个或更多。



Answer 2:

所以,我已经得到了代码工作。 这里是工作的解决方案,

    package pattern;

    import org.apache.flink.cep.CEP;
    import org.apache.flink.cep.PatternSelectFunction;
    import org.apache.flink.cep.PatternStream;
    import org.apache.flink.cep.pattern.Pattern;
    import org.apache.flink.cep.pattern.conditions.IterativeCondition;
    import org.apache.flink.streaming.api.datastream.DataStream;
    import org.apache.flink.streaming.api.datastream.DataStreamSource;
    import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
    import org.apache.flink.util.Collector;

    import java.util.List;
    import java.util.Map;

    public class cep {

         public static void main(String[] args) throws Exception {


             StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();

                DataStreamSource<String> dss = env.socketTextStream("localhost", 3005);

                dss.print();

                Pattern<String,String> pattern = Pattern.<String> begin("first")
                        .where(new IterativeCondition<String>() {
                            @Override
                            public boolean filter(String word, Context<String> context) throws Exception {
                                return word.equals("alert");
                            }
                        })
                        .times(5);

                PatternStream<String> patternstream = CEP.pattern(dss, pattern);

                DataStream<String> alerts = patternstream
                        .select(new PatternSelectFunction<String, String>() {
                            @Override
                            public String select(Map<String, List<String>> in) throws Exception {

                                String first = in.get("first").get(0);

                                if(first.equals("alert")){

                                    return ("5 or more alerts");
                                }
                                else{

                                    return (" ");
                                }
                            }
                        });

                alerts.print();

                env.execute();

            }

    }


文章来源: Flink Complex Event Processing