I am implementing Kafka consumer class to receive messages. I wanted to only get the new messages every time. Therefore, I set enable.auto.commit
true. However the offset does not seem to change at all. Even though the topic, consumer group and partition has been always the same.
Here is my consumer code:
consumerConfig.put("bootstrap.servers", bootstrap);
consumerConfig.put("group.id", KafkaTestConstants.KAFKA_GROUP);
consumerConfig.put("enable.auto.commit", "true");
consumerConfig.put("auto.offset.reset", "earliest");
consumerConfig.put("auto.commit.interval", 1000);
consumerConfig.put("key.deserializer", StringDeserializer.class.getName());
consumerConfig.put("value.deserializer", StringDeserializer.class.getName());
StringDeserializer deserializer = new StringDeserializer();
KafkaConsumer<String, String> kafkaConsumer = new KafkaConsumer<>(consumerConfig, deserializer, deserializer);
TopicPartition tp = new TopicPartition(KafkaTestConstants.KAFKA_TOPIC, 0);
List<TopicPartition> tps = Arrays.asList(tp);
kafkaConsumer.assign(tps);
long offset = kafkaConsumer.position(tp);
System.out.println("Offset of partition: " + offset);
List<String> messages = new ArrayList<String>();
ConsumerRecords<String, String> records = kafkaConsumer.poll(100);
for (ConsumerRecord<String, String> record : records) {
System.out.println("Message received: " + record.value());
messages.add(record.value());
}
consumer.commitAsync();
System.out.println("Offset commited.\n");
consumer.close();
No matter how many times I run it, it always shows the offset is 0. Therefore, it always receive all messages from the very beginning. What am I missing?
EDIT: Based on Matthias's answer, I decided to manually commit the offset. However commitSync()
would hang. commitAsync()
sort of works. I will explain the "sort of" later. Here is what the code does:
producer send 2 messages;
consumer initiates;
print out current position;
consumer.poll();
print received messages;
consumer.commitAsync();
This is how this code behaves. Say I have 100 messages. Now producers sends 2 new messages. Before consumer poll, it would show current offset position as 102 which is supposed to be 100. Therefore, no new messages will be printed out. It is almost like the offset is updated after producer sent the messages.