I need to read the messages in a given time range out of a Kafka topic. The solution that I can think of is to first find out the largest offset for the beginning of the time range, and then continue consume the messages until the offsets on all partitions past the end of the time range. Is there any better approach for solving this problem? Thanks!
标签:
apache-kafka
相关问题
- Delete Messages from a Topic in Apache Kafka
- Serializing a serialized Thrift struct to Kafka in
- Kafka broker shutdown while cleaning up log files
- Getting : Error importing Spark Modules : No modul
- How to transform all timestamp fields when using K
相关文章
- Kafka doesn't delete old messages in topics
- Kafka + Spark Streaming: constant delay of 1 secon
- Spring Kafka Template implementaion example for se
- How to fetch recent messages from Kafka topic
- Determine the Kafka-Client compatibility with kafk
- Kafka to Google Cloud Platform Dataflow ingestion
- Kafka Producer Metrics
- Spark Structured Streaming + Kafka Integration: Mi
Well, you definitely have to first search for the first offset which fits the opening of the time range.
This can be done using the KafkaConsumer#offsetsForTimes method.
The method accepts a map of
Map<TopicPartition, Long(timestamp)>
, and returns aMap<TopicPartition, OffsetAndTimestamp>
where the timestamp inOffsetAndTimestamp
is of the first message with timestamp Equal to or greater then the one specified.From there, you can assign your consumer to the offset returned, and iterate until the timestamp in the record exceeds the end of your time range.
Some pseudo code:
What you mean by saying "time range"?
Time range of what of the messages in queue or time stamp in the messages? :-)
What I would consider is to use Kafka Streams and windowed stream and either take messages out of stream with assumption that current time stamp of take message is the one from the range, then consider message, otherwise just ignore it.
On the other hand if you consider time stamps within the message, then small extention to the streamming (in java DSL .filter() method) will really good filter out messages for you. You just need to formulate good predicate.
See: Kafka Streams (Confluent) and Kafka Streams (Apache)