I'm programming a simple example for testing the new Scala API for CEP in Flink, using the latest Github version for 1.1-SNAPSHOT.
The Pattern is only a check for a value, and outputs a single String as a result for each pattern matched. Code is as follows:
val pattern : Pattern[(String, Long, Int), _] = Pattern.begin("start").where(_._3 < 4)
val cepEventAlert = CEP.pattern(streamingAlert, pattern)
def selectFn(pattern : mutable.Map[String, (String, Long, Int)]): String = {
val startEvent = pattern.get("start").get
"Alerta:"+startEvent._1+": Pattern"
}
val patternStreamSelected = cepEventAlert.select(selectFn(_))
patternStreamSelected.print()
It compiles and runs under 1.1-SNAPSHOT without issue, but the jobmanager output shows no sign of that print(). Even relaxing the pattern conditions, and setting only a "start" (Accepting all events) returns absolutely nothing.
Also, when trying to add stages, the code fails to compile. If I change the Pattern to (Finding two consecutive events with 3rd field less than 4):
Pattern.begin("start").where(_._3 < 4).next("end").where(_._3 < 4).within(Time.seconds(30))
The compiler then throws:
error: missing parameter type for expanded function ((x$4) => x$4._3.$less(4))
Showing the error is on the first where() after the "start" stage. I tryed to explicitly set the parameter type with:
(x: (String, Long, Int)) => x._3 < 4
That way it compiles again, but when it runs on Flink, then no output is shown. StreamingAlert is a Scala DataStream[(String, Long, Int)], and in other parts of code, I can filter with _._ < 4
without problems and the output seems correct.