Flink CEP No Results Printed

2019-02-19 15:13发布

I am trying to print out a string if Hello and world are found using the Flink CEP library. My source is Kafka and using the console-producer to input the data. That part is working. I can print out what I enter into the topic. However, it will not print out my final message "The world is so nice!". It will not even print out that it entered the lambda. Below is the class

package kafka;

import org.apache.flink.cep.CEP;
import org.apache.flink.cep.PatternStream;
import org.apache.flink.cep.pattern.Pattern;
import org.apache.flink.streaming.api.TimeCharacteristic;
import org.apache.flink.streaming.api.datastream.DataStream;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import org.apache.flink.streaming.api.windowing.time.Time;
import org.apache.flink.streaming.connectors.kafka.FlinkKafkaConsumer08;
import org.apache.flink.streaming.util.serialization.SimpleStringSchema;
import org.apache.flink.util.Collector;

import java.util.Map;
import java.util.Properties;

/**
 * Created by crackerman on 9/16/16.
*/
public class WordCount {

public static void main(String[] args) throws Exception {

    Properties properties = new Properties();
    properties.put("bootstrap.servers", "localhost:9092");
    properties.put("zookeeper.connect", "localhost:2181");
    properties.put("group.id", "test");
    StreamExecutionEnvironment see = StreamExecutionEnvironment.getExecutionEnvironment();
    see.setStreamTimeCharacteristic(TimeCharacteristic.EventTime);

    DataStream<String> src = see.addSource(new FlinkKafkaConsumer08<>("complexString",
                                                                      new SimpleStringSchema(),
                                                                      properties));

    src.print();


    Pattern<String, String> pattern = Pattern.<String>begin("first")
            .where(evt -> evt.contains("Hello"))
            .followedBy("second")
            .where(evt -> evt.contains("World"));

    PatternStream<String> patternStream = CEP.pattern(src, pattern);

    DataStream<String> alerts = patternStream.flatSelect(
            (Map<String, String> in, Collector<String> out) -> {
                System.out.println("Made it to the lambda");
                String first = in.get("first");
                String second = in.get("second");
                System.out.println("First: " + first);
                System.out.println("Second: " + second);

                if (first.equals("Hello") && second.equals("World")) {

                    out.collect("The world is so nice!");
                }


            });

    alerts.print();

    see.execute();
}

}

Any help would be greatly appreciated.

Thanks!

1条回答
虎瘦雄心在
2楼-- · 2019-02-19 15:58

The issue is the following line

 see.setStreamTimeCharacteristic(TimeCharacteristic.EventTime);

If that is removed, it works the way I expected it to.

查看更多
登录 后发表回答