How shall we read the Kafka topics in a given time

2020-06-03 07:24发布

I need to read the messages in a given time range out of a Kafka topic. The solution that I can think of is to first find out the largest offset for the beginning of the time range, and then continue consume the messages until the offsets on all partitions past the end of the time range. Is there any better approach for solving this problem? Thanks!

2条回答
我想做一个坏孩纸
2楼-- · 2020-06-03 08:06

Well, you definitely have to first search for the first offset which fits the opening of the time range.

This can be done using the KafkaConsumer#offsetsForTimes method.

The method accepts a map of Map<TopicPartition, Long(timestamp)>, and returns a Map<TopicPartition, OffsetAndTimestamp> where the timestamp in OffsetAndTimestamp is of the first message with timestamp Equal to or greater then the one specified.

From there, you can assign your consumer to the offset returned, and iterate until the timestamp in the record exceeds the end of your time range.

Some pseudo code:

static void main(String[] args) {
    String topic = args[1];
    long timestampBeginning = Long.parseLong(args[2]);
    long timestampEnd = Long.parseLong(args[3]);
    TopicPartition partition = new TopicPartition(topic, 0);

    Consumer<Object, Object> consumer = createConsumer();

    long beginningOffset = consumer.offsetsForTimes(
            Collections.singletonMap(partition, timestampBeginning))
                    .get(partition).offset();

    consumer.assign(Collections.singleton(partition)); // must assign before seeking
    consumer.seek(partition, beginningOffset);

    for (ConsumerRecord<Object, Object> record : consumer.poll()) {
        if (record.timestamp() > timestampEnd) {
            break; // or whatever
        }

        // handle record
    }
}
查看更多
小情绪 Triste *
3楼-- · 2020-06-03 08:14

What you mean by saying "time range"?

Time range of what of the messages in queue or time stamp in the messages? :-)

What I would consider is to use Kafka Streams and windowed stream and either take messages out of stream with assumption that current time stamp of take message is the one from the range, then consider message, otherwise just ignore it.

On the other hand if you consider time stamps within the message, then small extention to the streamming (in java DSL .filter() method) will really good filter out messages for you. You just need to formulate good predicate.

See: Kafka Streams (Confluent) and Kafka Streams (Apache)

查看更多
登录 后发表回答