Kafka command-line consumer reads, but cannot read

2019-08-31 13:03发布

问题:

I have manually created topic test with this command:

bin/kafka-topics.sh --create --zookeeper localhost:2181 --replication-factor 1 --partitions 1 --topic test

and using this command:

bin/kafka-console-producer.sh --broker-list localhost:9092 --topic test

I inserted these records:

This is a message
This is another message
This is a message2

First, I consume messages through the command line like this:

bin/kafka-console-consumer.sh --bootstrap-server localhost:9092 --topic test --from-beginning

and all the records are successfully shown. Then, I try to implement a consumer in Java using this code:

public class KafkaSubscriber {

    public void consume() {

        Properties props = new Properties();
        props.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, "localhost:9092");
        props.put(ConsumerConfig.GROUP_ID_CONFIG, "test-consumer-group");
        props.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, "org.apache.kafka.common.serialization.StringDeserializer");
        props.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, "org.apache.kafka.common.serialization.StringDeserializer");
        Consumer<String, String> consumer = new KafkaConsumer<>(props);
        consumer.subscribe(Collections.singletonList("test"));
        // also with this command
        // consumer.subscribe(Arrays.asList("test"));

        System.out.println("Starting to read data...");

        try {
            while (true) {
                try {
                    ConsumerRecords<String, String> records = consumer.poll(100);
                    System.out.println("Number of records found: " + records.count());
                    for (ConsumerRecord rec : records) {
                        System.out.println(rec.value());
                    }
                }
                catch (Exception ex) {
                    ex.printStackTrace();
                }
            }
        }
        catch (Exception e) {
                e.printStackTrace();
        } finally {
            consumer.close();
        }
}

But the output is:

Starting to read data...
0
0
0
0
0
....

Which means that it does not find any records in topic test. I also tried to publish some records after the Java consumer has started, but the same again. Any ideas what might be going wrong?


EDIT: After adding the following line:

 props.put(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG, "earliest");

the consumer now reads only when I write new records to the topic. It does not read all the records from the beggining.

回答1:

By default, if no offsets have previously been committed for the group, the consumer starts at the end topics.

Hence if you are running it after having produced records, it won't receive them.

Notice in your kafka-console-consumer.sh, you have the --from-beginning flag which forces the consumer to instead start from the beginning of the topic.

One workaround, as suggested in a comment, is to set ConsumerConfig.AUTO_OFFSET_RESET_CONFIG to earliest. However I'd be careful with that setting as your consumer will consume from the start of the topics and this could be a lot of data in a real use case.

The easiest solution is now that you've run your consumer once and it has created a group, you can simply rerun the producer. After that when you run the consumer again it will pick up from its last position which will be before the new producer messages.

On the other hand, if you mean to always reconsume all messages then you have 2 options:

  • explicitely use seekToBeginning() when your consumer starts to move its position to the start of topics

  • set auto.offset.reset to earliest and disable auto offset commit by setting enable.auto.commit to false