I have manually created topic test
with this command:
bin/kafka-topics.sh --create --zookeeper localhost:2181 --replication-factor 1 --partitions 1 --topic test
and using this command:
bin/kafka-console-producer.sh --broker-list localhost:9092 --topic test
I inserted these records:
This is a message
This is another message
This is a message2
First, I consume messages through the command line like this:
bin/kafka-console-consumer.sh --bootstrap-server localhost:9092 --topic test --from-beginning
and all the records are successfully shown. Then, I try to implement a consumer in Java using this code:
public class KafkaSubscriber {
public void consume() {
Properties props = new Properties();
props.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, "localhost:9092");
props.put(ConsumerConfig.GROUP_ID_CONFIG, "test-consumer-group");
props.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, "org.apache.kafka.common.serialization.StringDeserializer");
props.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, "org.apache.kafka.common.serialization.StringDeserializer");
Consumer<String, String> consumer = new KafkaConsumer<>(props);
// also with this command
// consumer.subscribe(Arrays.asList("test"));
System.out.println("Starting to read data...");
try {
while (true) {
try {
ConsumerRecords<String, String> records = consumer.poll(100);
System.out.println("Number of records found: " + records.count());
for (ConsumerRecord rec : records) {
catch (Exception ex) {
catch (Exception e) {
} finally {
But the output is:
Starting to read data...
Which means that it does not find any records in topic test
. I also tried to publish some records after the Java consumer has started, but the same again. Any ideas what might be going wrong?
EDIT: After adding the following line:
props.put(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG, "earliest");
the consumer now reads only when I write new records to the topic. It does not read all the records from the beggining.