Kafka - problems with TimestampExtractor

2019-04-08 03:54发布

问题:

I use org.apache.kafka:kafka-streams:0.10.0.1

I'm attempting to work with a time series based stream that doesn't seem to be triggering a KStream.Process() to trigger ("punctuate"). (see here for reference)

In a KafkaStreams config I'm passing in this param (among others):

config.put(
  StreamsConfig.TIMESTAMP_EXTRACTOR_CLASS_CONFIG,
  EventTimeExtractor.class.getName());

Here, EventTimeExtractor is a custom timestamp extractor (that implements org.apache.kafka.streams.processor.TimestampExtractor) to extract the timestamp information from JSON data.

I would expect this to call my object (derived from TimestampExtractor) when each new record is pulled in. The stream in question is 2 * 10^6 records / minute. I have punctuate() set to 60 seconds and it never fires. I know the data passes this span very frequently since its pulling old values to catch up.

In fact it never gets called at all.

  • Is this the wrong approach to setting timestamps on KStream records?
  • Is this the wrong way to declare this configuration?

回答1:

Update Nov 2017: Kafka Streams in Kafka 1.0 now supports punctuate() with both stream-time and with processing-time (wall clock time) behavior. So you can pick whichever behavior you prefer.

Your setup seems correct to me.

What you need to be aware of: As of Kafka 0.10.0, the punctuate() method operates on stream-time (by default, i.e. based on the default timestamp extractor, stream-time will mean event-time). And the stream-time is only advanced when new data records are coming in, and how much the stream-time is advanced is determined by the associated timestamps of these new records.

For example:

  • Let's assume you have set punctuate() to be called every 1 minute = 60 * 1000 (note: 1 minute of stream-time). Now, if it happens that no data is being received for the next 5 minutes, punctuate() will not be called at all -- even though you might expect it to be called 5 times. Why? Again, because punctuate() depends on stream-time, and the stream-time is only advanced based on newly received data records.

Might this be causing the behavior you are seeing?

Looking ahead: There's already a ongoing discussion in the Kafka project on how to make punctuate() more flexible, e.g. to have trigger it not only based on stream-time (which defaults to event-time) but also based on processing-time.



回答2:

Your approach seems to be correct. Compare pargraph "Timestamp Extractor (timestamp.extractor):" in http://docs.confluent.io/3.0.1/streams/developer-guide.html#optional-configuration-parameters

Not sure, why your custom timestamp extractor is not used. Have a look into org.apache.kafka.streams.processor.internals.StreamTask. In the constructor there should be something like

TimestampExtractor timestampExtractor1 = (TimestampExtractor)config.getConfiguredInstance("timestamp.extractor", TimestampExtractor.class);

Check if your custom extractor is picked up there or not...



回答3:

I think this is another case of issues at the broker level. I went and rebuilt the cluster using instances with more CPU and RAM. Now I'm getting the results I expected.

Note to distant observer(s): if your KStream app is behaving strangely take a look at your brokers and make sure they aren't stuck in GC and have plenty of 'headroom' for file handles, RAM, etc.

See also