I use org.apache.kafka:kafka-streams:0.10.0.1
I'm attempting to work with a time series based stream that doesn't seem to be triggering a KStream.Process()
to trigger ("punctuate"). (see here for reference)
In a KafkaStreams
config I'm passing in this param (among others):
config.put(
StreamsConfig.TIMESTAMP_EXTRACTOR_CLASS_CONFIG,
EventTimeExtractor.class.getName());
Here, EventTimeExtractor
is a custom timestamp extractor (that implements org.apache.kafka.streams.processor.TimestampExtractor
) to extract the timestamp information from JSON data.
I would expect this to call my object (derived from TimestampExtractor
) when each new record is pulled in. The stream in question is 2 * 10^6 records / minute. I have punctuate()
set to 60 seconds and it never fires. I know the data passes this span very frequently since its pulling old values to catch up.
In fact it never gets called at all.
- Is this the wrong approach to setting timestamps on KStream records?
- Is this the wrong way to declare this configuration?
Update Nov 2017: Kafka Streams in Kafka 1.0 now supports
punctuate()
with both stream-time and with processing-time (wall clock time) behavior. So you can pick whichever behavior you prefer.Your setup seems correct to me.
What you need to be aware of: As of Kafka 0.10.0, the
punctuate()
method operates on stream-time (by default, i.e. based on the default timestamp extractor, stream-time will mean event-time). And the stream-time is only advanced when new data records are coming in, and how much the stream-time is advanced is determined by the associated timestamps of these new records.For example:
punctuate()
to be called every 1 minute =60 * 1000
(note: 1 minute of stream-time). Now, if it happens that no data is being received for the next 5 minutes,punctuate()
will not be called at all -- even though you might expect it to be called 5 times. Why? Again, becausepunctuate()
depends on stream-time, and the stream-time is only advanced based on newly received data records.Might this be causing the behavior you are seeing?
Looking ahead: There's already a ongoing discussion in the Kafka project on how to make
punctuate()
more flexible, e.g. to have trigger it not only based onstream-time
(which defaults toevent-time
) but also based onprocessing-time
.I think this is another case of issues at the broker level. I went and rebuilt the cluster using instances with more CPU and RAM. Now I'm getting the results I expected.
Note to distant observer(s): if your KStream app is behaving strangely take a look at your brokers and make sure they aren't stuck in GC and have plenty of 'headroom' for file handles, RAM, etc.
See also
Your approach seems to be correct. Compare pargraph "Timestamp Extractor (timestamp.extractor):" in http://docs.confluent.io/3.0.1/streams/developer-guide.html#optional-configuration-parameters
Not sure, why your custom timestamp extractor is not used. Have a look into
org.apache.kafka.streams.processor.internals.StreamTask
. In the constructor there should be something likeCheck if your custom extractor is picked up there or not...