I create kafka streaming application using Processor API.
Here is how i create a topic to attach timestamp to all incoming messages
kafka-topics.sh --create --zookeeper localhost:2181
--replication-factor 1 --partitions 1 --topic topicName --config message.timestamp.type=CreateTime
The workflow is processing the incoming messages from source topic and posting it to sink topic. For some strange reason, I have seen same timestamp coming in both source and sink topic messages.
Say for ex, in source topic for a message create time is T0 , that remains same in sink topic as well.
What do i need to do to see the updated timestamp in the sink topic messages?
If you configure a topic with CreateTime
the timestamp store will be the timestamp provided by the producer.
For a plain KafkaProducer
is you don't specify the timestamp explicitly, KafkaProducer
uses System.currentTimeMillis()
and send the message to the broker.
For Kafka Streams, if you read input records with certain timestamps, we have dedicate timestamp inference logic to compute the timestamp for the result records. Thus, Kafka Streams set the timestamp explicitly when handing it to the internally used KafkaProducer
and thus the producer does just use this timestamp and does not use current wall-clock-time. For stream processing, this is usually desired behavior.
If you have a simple pipeline that just copies data from one topic to another, the timestamp inference will use the input record timestamp as output record timestamp.
There are two thing you can do to get different semantics:
- Configure
WallClockTimestampExtractor
for you Kafka Streams application. For this case, Kafka Stream will not use the embedded record timestamp but the current wall-clock time to derive the timestamp of the output record.
- Configure your output topic with
AppendTime
instead of CreateTime
. For this case, the broker always overwrites the record timestamp provided by the producer with current broker wall-clock time.