I am implementing Kafka consumer class to receive messages. I wanted to only get the new messages every time. Therefore, I set enable.auto.commit
true. However the offset does not seem to change at all. Even though the topic, consumer group and partition has been always the same.
Here is my consumer code:
consumerConfig.put("bootstrap.servers", bootstrap);
consumerConfig.put("group.id", KafkaTestConstants.KAFKA_GROUP);
consumerConfig.put("enable.auto.commit", "true");
consumerConfig.put("auto.offset.reset", "earliest");
consumerConfig.put("auto.commit.interval", 1000);
consumerConfig.put("key.deserializer", StringDeserializer.class.getName());
consumerConfig.put("value.deserializer", StringDeserializer.class.getName());
StringDeserializer deserializer = new StringDeserializer();
KafkaConsumer<String, String> kafkaConsumer = new KafkaConsumer<>(consumerConfig, deserializer, deserializer);
TopicPartition tp = new TopicPartition(KafkaTestConstants.KAFKA_TOPIC, 0);
List<TopicPartition> tps = Arrays.asList(tp);
kafkaConsumer.assign(tps);
long offset = kafkaConsumer.position(tp);
System.out.println("Offset of partition: " + offset);
List<String> messages = new ArrayList<String>();
ConsumerRecords<String, String> records = kafkaConsumer.poll(100);
for (ConsumerRecord<String, String> record : records) {
System.out.println("Message received: " + record.value());
messages.add(record.value());
}
consumer.commitAsync();
System.out.println("Offset commited.\n");
consumer.close();
No matter how many times I run it, it always shows the offset is 0. Therefore, it always receive all messages from the very beginning. What am I missing?
EDIT: Based on Matthias's answer, I decided to manually commit the offset. However commitSync()
would hang. commitAsync()
sort of works. I will explain the "sort of" later. Here is what the code does:
producer send 2 messages;
consumer initiates;
print out current position;
consumer.poll();
print received messages;
consumer.commitAsync();
This is how this code behaves. Say I have 100 messages. Now producers sends 2 new messages. Before consumer poll, it would show current offset position as 102 which is supposed to be 100. Therefore, no new messages will be printed out. It is almost like the offset is updated after producer sent the messages.
Auto commit only works if you use consumer group management, and for this, you need to "subscribe" to a topic, but not "assign" partitions manually.
Compare the JavaDocs of
KafkaConsumer
. It's a long read, but required to understand the subtle details on how the use the consumer correctly: https://kafka.apache.org/0102/javadoc/index.html?org/apache/kafka/clients/consumer/KafkaConsumer.htmlFurthermore, if auto-commit is enabled, it will commit within
poll
(ie, a call topoll()
might commit the messages return from the previous call topoll()
) and not when you iterate through the returne messages. This also means, that your commits will "jump" forward, like from committed offset 0 to 100 (if you received 100 messages by poll for a single partition).