Detecting keyed state changes

2020-04-21 06:34发布

问题:

I'm new to the Dataflow programming model and have some trouble wrapping my mind around what I think should be a simple use case:

I have a pipeline reading live data from Pub/Sub, this data contains device statuses with (simplified) a serial number and a state (UP or DOWN). A device is guaranteed to send its state at least every 5 minutes, but then of course a device may send the same state multiple times.

What I'm trying to achieve is a pipeline that only emits state changes for a device, so basically keeping track of some notion of "last state per key" for a given key and comparing new events to that.

Is there a good way to do this at the moment?

回答1:

There is a related question at "Remove duplicates across window triggers/firings" but your question brings up some subtleties that differ. So let me address two aspects separately and refer some parts to the linked question.

1. Taking the latest input value

Your question differs here because it is not obviously outputting the result of an associative & commutative Combine operation. This is important because in Dataflow & Beam, the input is not ordered - it just carries timestamps so that we can reason about it in event time.

Over pairs of (timestamp, UP/DOWN) you can define an associative & commutative operation that just takes the maximum of the timestamp, and carries the state with it. You'll have to make an arbitrary choice in the case of two equal timestamps, but it sounds like you don't expect to encounter this situation.

In order to express your desires naturally, we would need a feature whereby GroupByKey also does a secondary sort of your values per key (and window). In this case, you would sort by timestamp, but the feature is pretty general and we are aware of the use case.

That will get as far as being able to express the "take the latest value" part of your logic.

2. Only produce output when the result has changed

This aspect corresponds directly to the linked question. Your question is different in that even having defined an associative & commutative operation, you lack a canonical identity element. In the answer there, filtering out of the identity element was key to approximating incremental output.

You could come up with schemes for encoding whether or not a change is necessary, such as expanding your accumulator type to tuples of (timestamp, CHANGE/NO_CHANGE, UP/DOWN) where there is the possibility of a monotonic transition from NO_CHANGE to CHANGE. But this only really helps if you have an identity element tagged with NO_CHANGE. And given an arbitrary choice between UP and DOWN it can only reduce data volume by half.

In your case, the conclusion is actually not the direct expression of "output only when the combined result has changed" but I would more strongly suggest that the right approach is to manage the state machine yourself using the stateful processing features available in Apache Beam, which will be the basis for Dataflow 2.x.

The stateful DoFn code might look something like this:

new DoFn<KV<DeviceId, UpDown>, KV<DeviceId, UpDown>>() {

  @StateId("latestTimestamp")
  private static final StateSpec<Object, ValueState<Instant>> latestTimestampSpec =
      StateSpecs.value(InstantCoder.of());

  @StateId("latestOutput")
  private static final StateSpec<Object, ValueState<UpDown>> latestOutputSpec =
      StateSpecs.value(UpDown.getCoder());

  @ProcessElement
  public void processElement(
      ProcessContext c,
      @StateId("latestTimestamp") latestTimestampState,
      @StateId("latestOutput") latestOutputState) {

    Instant latestTimestamp = latestTimestampState.read();
    UpDown latestOutput = latestOutputState.read();
    Instant newTimestamp = c.element().timestamp();
    UpDown newValue = c.element().getValue();

    if (newTimestamp.isAfter(latestTimestamp)
        && !newValue.equals(latestOutput)) {
      c.output(KV.of(c.element().getKey(), newValue));
      latestTimestampState.write(newTimestamp);
      latestOutputState.write(newValue);
    }
  }
}

This and the linked question are both inspirations for the example I used in this blog post on the Beam blog. So you might read up there for more details.