I am writing Flink CEP program inside the Lagom's Microservice Implementation. My FLINK CEP program run perfectly fine in simple scala application. But when i use this code inside the Lagom service implementation i am receiving the following exception
Lagom Service Implementation
override def start = ServiceCall[NotUsed, String] {
val env = StreamExecutionEnvironment.getExecutionEnvironment
var executionConfig = env.getConfig
env.setParallelism(1)
executionConfig.disableSysoutLogging()
var topic_name="topic_test"
var props= new Properties
props.put("bootstrap.servers", "localhost:9092")
props.put("acks","all");
props.put("key.serializer","org.apache.kafka.common.serialization.StringSerializer");
props.put("value.serializer","org.apache.kafka.common.serialization.ByteArraySerializer");
props.put("block.on.buffer.full","false");
val kafkaSource = new FlinkKafkaConsumer010 (topic_name, new KafkaDeserializeSchema , props)
val stream = env.addSource(kafkaSource)
val deliveryPattern = Pattern.begin[XYZ]("begin").where(_.ABC == 5)
.next("next").where(_.ABC == 10).next("end").where(_.ABC==5)
val deliveryPatternStream = CEP.pattern(stream, deliveryPattern)
def selectFn(pattern : collection.mutable.Map[String, XYZ]): String = {
val startEvent = pattern.get("begin").get
val nextEvent = pattern.get("next").get
"Alert Detected"
}
val deliveryResult =deliveryPatternStream.select(selectFn(_)).print()
env.execute("CEP")
req=> Future.successful("Done")
}
}
I don't understand how to resolve this issue.