I use org.apache.kafka:kafka-streams:0.10.0.1
I'm attempting to work with a time series based stream that doesn't seem to be triggering a KStream.Process()
to trigger ("punctuate"). (see here for reference)
In a KafkaStreams
config I'm passing in this param (among others):
config.put(
StreamsConfig.TIMESTAMP_EXTRACTOR_CLASS_CONFIG,
EventTimeExtractor.class.getName());
Here, EventTimeExtractor
is a custom timestamp extractor (that implements org.apache.kafka.streams.processor.TimestampExtractor
) to extract the timestamp information from JSON data.
I would expect this to call my object (derived from TimestampExtractor
) when each new record is pulled in. The stream in question is 2 * 10^6 records / minute. I have punctuate()
set to 60 seconds and it never fires. I know the data passes this span very frequently since its pulling old values to catch up.
In fact it never gets called at all.
- Is this the wrong approach to setting timestamps on KStream records?
- Is this the wrong way to declare this configuration?
Update Nov 2017: Kafka Streams in Kafka 1.0 now supports punctuate()
with both stream-time and with processing-time (wall clock time) behavior. So you can pick whichever behavior you prefer.
Your setup seems correct to me.
What you need to be aware of: As of Kafka 0.10.0, the punctuate()
method operates on stream-time (by default, i.e. based on the default timestamp extractor, stream-time will mean event-time). And the stream-time is only advanced when new data records are coming in, and how much the stream-time is advanced is determined by the associated timestamps of these new records.
For example:
- Let's assume you have set
punctuate()
to be called every 1 minute = 60 * 1000
(note: 1 minute of stream-time). Now, if it happens that no data is being received for the next 5 minutes, punctuate()
will not be called at all -- even though you might expect it to be called 5 times. Why? Again, because punctuate()
depends on stream-time, and the stream-time is only advanced based on newly received data records.
Might this be causing the behavior you are seeing?
Looking ahead: There's already a ongoing discussion in the Kafka project on how to make punctuate()
more flexible, e.g. to have trigger it not only based on stream-time
(which defaults to event-time
) but also based on processing-time
.
Your approach seems to be correct. Compare pargraph "Timestamp Extractor (timestamp.extractor):" in http://docs.confluent.io/3.0.1/streams/developer-guide.html#optional-configuration-parameters
Not sure, why your custom timestamp extractor is not used. Have a look into org.apache.kafka.streams.processor.internals.StreamTask
. In the constructor there should be something like
TimestampExtractor timestampExtractor1 = (TimestampExtractor)config.getConfiguredInstance("timestamp.extractor", TimestampExtractor.class);
Check if your custom extractor is picked up there or not...
I think this is another case of issues at the broker level. I went and rebuilt the cluster using instances with more CPU and RAM. Now I'm getting the results I expected.
Note to distant observer(s): if your KStream app is behaving strangely take a look at your brokers and make sure they aren't stuck in GC and have plenty of 'headroom' for file handles, RAM, etc.
See also