Set timestamp in output with Kafka Streams

2020-03-26 01:41发布

问题:

I'm getting CSVs in a Kafka topic "raw-data", the goal is to transform them by sending each line in another topic "data" with the right timestamp (different for each line).

Currently, I have 2 streamers:

  • one to split the lines in "raw-data", sending them to an "internal" topic (no timestamp)
  • one with a TimestampExtractor that consumes "internal" and send them to "data".

I'd like to remove the use of this "internal" topic by setting directly the timestamp but I couldn't find a way (the timestamp extractor are only used at consumption time).

I've stumbled upon this line in the documentation:

Note, that the describe default behavior can be changed in the Processor API by assigning timestamps to output records explicitly when calling #forward().

but I couldn't find any signature with a timestamp. What do they mean?

How would you do it?

Edit: To be clear, I have a Kafka topic with one message containing the event time and some value, such as:

2018-01-01,hello 2018-01-02,world (this is ONE message, not two)

I'd like to get two messages in another topic with the Kafka record timestamp set to their event time (2018-01-01 and 2018-01-02) without the need of an intermediate topic.

回答1:

Setting the timestamp for the output requires Kafka Streams 2.0 and is only supported in Processor API. If you use the DSL, you can use transform() to use those APIs.

As you pointed out, you would use context.forward(). The call would be:

stream.transform(new TransformerSupplier() {
  public Transformer get() {
    return new Transformer() {
      // omit other methods for brevity
      // you need to get the `context` from `init()`

      public KeyValue transform(K key, V value) {
        // some business logic

        // you can call #forward() as often as you want
        context.forward(newKey, newValue, To.all().withTimestamp(newTimestamp));

        return null; // only return data via context#forward()
      }
    }
  }
});