I am pretty new in the world of Streams and I am facing some issues in my first try.
More specifically, I am trying to implement a count and groupBy functionality in a sliding window using Flink.
I 've done it in a normal DateStream
but I am not able to make it work in a WindowedStream
.
Do you have any suggestion on how can I do it?
val parsedStream: DataStream[(String, Response)] = stream
.mapWith(_.decodeOption[Response])
.filter(_.isDefined)
.map { record =>
(
s"${record.get.group.group_country}, ${record.get.group.group_state}, ${record.get.group.group_city}",
record.get
)
}
val result: DataStream[((String, Response), Int)] = parsedStream
.map((_, 1))
.keyBy(_._1._1)
.sum(1)
// The output of result is
// ((us, GA, Atlanta,Response()), 14)
// ((us, SA, Atlanta,Response()), 4)
result
.keyBy(_._1._1)
.timeWindow(Time.seconds(5))
//the following part doesn't compile
.apply(
new WindowFunction[(String, Int), (String, Int), String, TimeWindow] {
def apply(
key: Tuple,
window: TimeWindow,
values: Iterable[(String, Response)],
out: Collector[(String, Int)]
) {}
}
)
Compilation Error:
overloaded method value apply with alternatives:
[R](function: (String, org.apache.flink.streaming.api.windowing.windows.TimeWindow, Iterable[((String, com.flink.Response), Int)], org.apache.flink.util.Collector[R]) => Unit)(implicit evidence$28: org.apache.flink.api.common.typeinfo.TypeInformation[R])org.apache.flink.streaming.api.scala.DataStream[R] <and>
[R](function: org.apache.flink.streaming.api.scala.function.WindowFunction[((String, com.flink.Response), Int),R,String,org.apache.flink.streaming.api.windowing.windows.TimeWindow])(implicit evidence$27: org.apache.flink.api.common.typeinfo.TypeInformation[R])org.apache.flink.streaming.api.scala.DataStream[R]
cannot be applied to (org.apache.flink.streaming.api.functions.windowing.WindowFunction[((String, com.flink.Response), Int),(String, com.flink.Response),String,org.apache.flink.streaming.api.windowing.windows.TimeWindow]{def apply(key: String,window: org.apache.flink.streaming.api.windowing.windows.TimeWindow,input: Iterable[((String, com.flink.Response), Int)],out: org.apache.flink.util.Collector[(String, com.flink.Response)]): Unit})
.apply(