Find count in WindowedStream - Flink

2019-08-19 01:24发布

问题:

I am pretty new in the world of Streams and I am facing some issues in my first try.

More specifically, I am trying to implement a count and groupBy functionality in a sliding window using Flink.

I 've done it in a normal DateStream but I am not able to make it work in a WindowedStream.

Do you have any suggestion on how can I do it?

val parsedStream: DataStream[(String, Response)] = stream
      .mapWith(_.decodeOption[Response])
      .filter(_.isDefined)
      .map { record =>
        (
          s"${record.get.group.group_country}, ${record.get.group.group_state}, ${record.get.group.group_city}",
          record.get
        )
      }

val result: DataStream[((String, Response), Int)] = parsedStream
      .map((_, 1))
      .keyBy(_._1._1)
      .sum(1)

// The output of result is 
// ((us, GA, Atlanta,Response()), 14)
// ((us, SA, Atlanta,Response()), 4)

result
      .keyBy(_._1._1)
      .timeWindow(Time.seconds(5))

//the following part doesn't compile

      .apply(
        new WindowFunction[(String, Int), (String, Int), String, TimeWindow] {
          def apply(
                   key: Tuple,
                   window: TimeWindow,
                   values: Iterable[(String, Response)],
                   out: Collector[(String, Int)]
                   ) {}
        }
      )

Compilation Error:

overloaded method value apply with alternatives:
  [R](function: (String, org.apache.flink.streaming.api.windowing.windows.TimeWindow, Iterable[((String, com.flink.Response), Int)], org.apache.flink.util.Collector[R]) => Unit)(implicit evidence$28: org.apache.flink.api.common.typeinfo.TypeInformation[R])org.apache.flink.streaming.api.scala.DataStream[R] <and>
  [R](function: org.apache.flink.streaming.api.scala.function.WindowFunction[((String, com.flink.Response), Int),R,String,org.apache.flink.streaming.api.windowing.windows.TimeWindow])(implicit evidence$27: org.apache.flink.api.common.typeinfo.TypeInformation[R])org.apache.flink.streaming.api.scala.DataStream[R]
 cannot be applied to (org.apache.flink.streaming.api.functions.windowing.WindowFunction[((String, com.flink.Response), Int),(String, com.flink.Response),String,org.apache.flink.streaming.api.windowing.windows.TimeWindow]{def apply(key: String,window: org.apache.flink.streaming.api.windowing.windows.TimeWindow,input: Iterable[((String, com.flink.Response), Int)],out: org.apache.flink.util.Collector[(String, com.flink.Response)]): Unit})
      .apply(

回答1:

This is a simpler example that we can work on

val source: DataStream[(JsonField, Int)] = env.fromElements(("hello", 1), ("hello", 2))

    val window2 = source
      .keyBy(0)
      .timeWindow(Time.minutes(1))
      .apply(new WindowFunction[(JsonField, Int), Int, String, TimeWindow] {})



回答2:

I have tried Your code and found the errors, it seems that you have an error when declaring the types for your WindowFunction.

The documentation says that the expected types for WindowFunction are WindowFunction[IN, OUT, KEY, W <: Window]. Now, if you take a look at Your code, Your IN is the type of the datastream that You are calculating windows on. The type of the stream is ((String, Response), Int) and not as declared in the code (String, Int).

If You will change the part that is not compiling to :

.apply(new WindowFunction[((String, Response), Int), (String, Response), String, TimeWindow] {
        override def apply(key: String, window: TimeWindow, input: Iterable[((String, Response), Int)], out: Collector[(String, Response)]): Unit = ???
})

EDIT: As for the second example the error occurs because of the same reason in general. When You are using keyBy with Tuple You have two possible functions to use keyBy(fields: Int*), which uses integer to access field of the tuple using index provided (this is what You have used). And also keyBy(fun: T => K) where You provide a function to extract the key that will be used.

But there is one important difference between those functions one of them returns key as JavaTuple and the other one returns the key with its exact type. So basically If You change the String to Tuple in Your simplified example it should compile clearly.