I need to get the message produced in Kafka hour by hour in a day. Every one hour I will launch a job to consume the message produced 1 hour ago. e.g., if current time is 20:12, I will consume the message between 19:00:00 and 19:59:59. That means I need to get start offset by time 19:00:00 and end offset by time 19:59:59. I used SimpleConsumer.getOffsetsBefore as shown in 「0.8.0 SimpleConsumer Example」. The problem is the returning offset does not match the timestamp given as a parameter. e.g. When make timestamp 19:00:00, I get the message produced at time 16:38:00.
相关问题
- mySQL alter table on update, current timestamp
- How to convert from Timestamp to Mongo ObjectID
- Calculate sum time in Oracle
- Change matplotlib offset notation from scientific
- php convert a date to a timestamp
相关文章
- PostgreSQL update time zone offset
- How can i insert timestamp with timezone in postgr
- SQL- Difference between TIMESTAMP, DATE AND TIMEST
- Convert timestamp to hours and minutes for a movie
- Timestamp based sync, how to reduce the mobile dat
- Parsing date strings using google apps script
- Pandas monthly rolling window
- Get offset neighbors of an array
Show you the code:
As the other replies note, older versions of Kafka had only an approximate way of mapping times to offsets. However, since Kafka 0.10.0 (released in May 2016), Kafka maintains a time index for each topic. This will allow you to efficiently get from times to exact offsets. You can use the KafkaConsumer#offsetsForTimes method to access this information.
There are more details about how the time-based index is implemented on the KIP-33 design discussion page.
Kafka 1.10 does support timestamps, although it will still be a little challenge to use it to do what you want to do. But if you know but from which timestamp you want to read, and until you want to read, then you can just poll messages till that time, and stop consuming.
In Kafka there is currently no way to get an offset that corresponds to a particular timestamp - this is by design. As described near the top of Jay Kreps's Log Article, the offset number provides a sort of timestamp for the log that is decoupled from the wall clock time. With the offset as your notion of time then you can know if any two systems are in a consistent state just buy knowing what offset they have read until. There is never any confusion about different clock times on different servers, leap years, day light savings time, time zones, etc. It's kinda nice...
NOW... all that said, if you know your server went down at some time X then practically speaking, you would really like to know the corresponding offset. You can get close. The log files on the kafka machines are named according to the time that they started writing, and there exists a kafka tool (that I can't find right now) that let's you know which offsets are associated with these files. If you want to know the exact timestamp though, then you must encode the timestamp in the messages that you're sending to Kafka.
Below kafka consumer api method
getOffsetsByTimes()
can be used for this , it is available from 0.10.0 version or higher. See JavaDoc.