Apache flink broadcast state gets flushed

2019-08-24 02:58发布

问题:

I am using the broadcast pattern to connect two streams and read data from one to another. The code looks like this

case class Broadcast extends BroadCastProcessFunction[MyObject,(String,Double), MyObject]{
  override def processBroadcastElement(in2: (String, Double), 
                                       context: BroadcastProcessFunction[MyObject, (String, Double), MyObject]#Context,
                                       collector:Collector[MyObject]):Unit={
    context.getBroadcastState(broadcastStateDescriptor).put(in2._1,in2._2)
  }

  override def processElement(obj: MyObject,
                            readOnlyContext:BroadCastProcessFunction[MyObject, (String,Double), 
                            MyObject]#ReadOnlyContext, collector: Collector[MyObject]):Unit={
    val theValue = readOnlyContext.getBroadccastState(broadcastStateDesriptor).get(obj.prop)
    //If I print the context of the state here sometimes it is empty.
    out.collect(MyObject(new, properties, go, here))
  }
}

The state descriptor:

val broadcastStateDescriptor: MapStateDescriptor[String, Double) = new MapStateDescriptor[String, Double]("name_for_this", classOf[String], classOf[Double])

My execution code looks like this.

val streamA :DataStream[MyObject] = ... 
val streamB :DataStream[(String,Double)] = ... 
val broadcastedStream = streamB.broadcast(broadcastStateDescriptor)

streamA.connect(streamB).process(new Broadcast)

The problem is in the processElement function the state sometimes is empty and sometimes not. The state should always contain data since I am constantly streaming from a file that I know it has data. I do not understand why it is flushing the state and I cannot get the data.

I tried adding some printing in the processBroadcastElement before and after putting the data to the state and the result is the following

0 - 1
1 - 2 
2 - 3 
.. all the way to 48 where it resets back to 0

UPDATE: something that I noticed is when I decrease the value of the timeout of the streaming execution context, the results are a bit better. when I increase it then the map is always empty.

env.setBufferTimeout(1) //better results 
env.setBufferTimeout(200) //worse result (default is 100)

回答1:

Whenever two streams are connected in Flink, you have no control over the timing with which Flink will deliver events from the two streams to your user function. So, for example, if there is an event available to process from streamA, and an event available to process from streamB, either one might be processed next. You cannot expect the broadcastedStream to somehow take precedence over the other stream.

There are various strategies you might employ to cope with this race between the two streams, depending on your requirements. For example, you could use a KeyedBroadcastProcessFunction and use its applyToKeyedState method to iterate over all existing keyed state whenever a new broadcast event arrives.



回答2:

As David mentioned the job could be restarting. I disabled the checkpoints so I could see any possible exception thrown instead of flink silently failing and restarting the job.

It turned out that there was an error while trying to parse the file. So the job kept restarting thus the state was empty and flink kept consuming the stream over and over again.