Apache Flink - Send event if no data was received

2019-05-28 13:29发布

How can I implement an operator with Flink's DataStream API that sends an event when no data was received from a stream for a certain amount of time?

2条回答
冷血范
2楼-- · 2019-05-28 14:13

You could set up a time window with a custom trigger function. In the trigger function, every time the an event is received the "onEvent" method would set a processingTimeTrigger to "currentTime + desiredTimeDelay". Then when a new event comes, you delete the trigger that was previously set and make a new one. If an event doesn't come by the time the system time is the time on the processingTimeTrigger, it fires and the window would be processed. Even if no events came, the list of events that are going to be processed would just be empty.

查看更多
3楼-- · 2019-05-28 14:22

Such an operator can be implemented using a ProcessFunction.

DataStream<Long> input = env.fromElements(1L, 2L, 3L, 4L);

input
  // use keyBy to have keyed state. 
  // NullByteKeySelector will move all data to one task. You can also use other keys
  .keyBy(new NullByteKeySelector())
  // use process function with 60 seconds timeout
  .process(new TimeOutFunction(60 * 1000));

The TimeOutFunction is defined as follows. In this example it uses processing time.

public static class TimeOutFunction extends ProcessFunction<Long, Boolean> {

  // delay after which an alert flag is thrown
  private final long timeOut;
  // state to remember the last timer set
  private transient ValueState<Long> lastTimer;

  public TimeOutFunction(long timeOut) {
    this.timeOut = timeOut;
  }

  @Override
  public void open(Configuration conf) {
    // setup timer state
    ValueStateDescriptor<Long> lastTimerDesc = 
      new ValueStateDescriptor<Long>("lastTimer", Long.class);
    lastTimer = getRuntimeContext().getState(lastTimerDesc);
  }

  @Override
  public void processElement(Long value, Context ctx, Collector<Boolean> out) throws Exception {
    // get current time and compute timeout time
    long currentTime = ctx.timerService().currentProcessingTime();
    long timeoutTime = currentTime + timeOut;
    // register timer for timeout time
    ctx.timerService().registerProcessingTimeTimer(timeoutTime);
    // remember timeout time
    lastTimer.update(timeoutTime);
  }

  @Override
  public void onTimer(long timestamp, OnTimerContext ctx, Collector<Boolean> out) throws Exception {
    // check if this was the last timer we registered
    if (timestamp == lastTimer.value()) {
      // it was, so no data was received afterwards.
      // fire an alert.
      out.collect(true);
    }
  }
}
查看更多
登录 后发表回答