I am testing the Kafka High Level Consumer using the ConsumerGroupExample code from the Kafka site. I would like to retrieve all the existing messages on the topic called "test" that I have in the Kafka server config. Looking at other blogs, auto.offset.reset should be set to "smallest" to be able to get all messages:
private static ConsumerConfig createConsumerConfig(String a_zookeeper, String a_groupId) {
Properties props = new Properties();
props.put("zookeeper.connect", a_zookeeper);
props.put("group.id", a_groupId);
props.put("auto.offset.reset", "smallest");
props.put("zookeeper.session.timeout.ms", "10000");
return new ConsumerConfig(props);
The question I really have is this: what is the equivalent Java api call for the High Level Consumer that is the equivalent of:
bin/kafka-console-consumer.sh --zookeeper localhost:2181 --topic test --from-beginning
This properties will help you.
Basically, everytime a new consumer tries to consume a topic, it'll read messages from the beginning. If you're especially just consuming from the beginning each time for testing purposes, everytime you initialise your consumer with a new groupID, it'll read the messages from the beginning. Here's how I did it :
and read messages from the beginning each time!
To fetch messages from the beginning, you can do this:
then just follow the routine work...
Looks like you need to use the "low level SimpleConsumer API"
This example worked for getting all messages from a topic with the following arguments: (note that the port is the Kafka port, not the ZooKeeper port, topics set up from this example):
Specifically, there is a method to get readOffset which takes kafka.api.OffsetRequest.EarliestTime():
Here is another post may provide some alternate ideas on how to sort this out: How to get data from old offset point in Kafka?