Why doesn't offset get updated when messages a

2019-09-14 08:03发布

I am implementing Kafka consumer class to receive messages. I wanted to only get the new messages every time. Therefore, I set enable.auto.commit true. However the offset does not seem to change at all. Even though the topic, consumer group and partition has been always the same.

Here is my consumer code:

    consumerConfig.put("bootstrap.servers", bootstrap);
    consumerConfig.put("group.id", KafkaTestConstants.KAFKA_GROUP);
    consumerConfig.put("enable.auto.commit", "true");
    consumerConfig.put("auto.offset.reset", "earliest");
    consumerConfig.put("auto.commit.interval", 1000);
    consumerConfig.put("key.deserializer", StringDeserializer.class.getName());
    consumerConfig.put("value.deserializer", StringDeserializer.class.getName());
    StringDeserializer deserializer = new StringDeserializer();
    KafkaConsumer<String, String> kafkaConsumer = new KafkaConsumer<>(consumerConfig, deserializer, deserializer);

    TopicPartition tp = new TopicPartition(KafkaTestConstants.KAFKA_TOPIC, 0);
    List<TopicPartition> tps = Arrays.asList(tp);
    kafkaConsumer.assign(tps);
    long offset = kafkaConsumer.position(tp);
    System.out.println("Offset of partition: " + offset);

    List<String> messages = new ArrayList<String>();
    ConsumerRecords<String, String> records = kafkaConsumer.poll(100);

    for (ConsumerRecord<String, String> record : records) {
        System.out.println("Message received: " + record.value());
        messages.add(record.value());
    }

    consumer.commitAsync();
    System.out.println("Offset commited.\n");
    consumer.close();

No matter how many times I run it, it always shows the offset is 0. Therefore, it always receive all messages from the very beginning. What am I missing?

EDIT: Based on Matthias's answer, I decided to manually commit the offset. However commitSync() would hang. commitAsync() sort of works. I will explain the "sort of" later. Here is what the code does:

producer send 2 messages;
consumer initiates;
print out current position;
consumer.poll();
print received messages;
consumer.commitAsync();

This is how this code behaves. Say I have 100 messages. Now producers sends 2 new messages. Before consumer poll, it would show current offset position as 102 which is supposed to be 100. Therefore, no new messages will be printed out. It is almost like the offset is updated after producer sent the messages.

1条回答
做个烂人
2楼-- · 2019-09-14 08:37

Auto commit only works if you use consumer group management, and for this, you need to "subscribe" to a topic, but not "assign" partitions manually.

Compare the JavaDocs of KafkaConsumer. It's a long read, but required to understand the subtle details on how the use the consumer correctly: https://kafka.apache.org/0102/javadoc/index.html?org/apache/kafka/clients/consumer/KafkaConsumer.html

Furthermore, if auto-commit is enabled, it will commit within poll (ie, a call to poll() might commit the messages return from the previous call to poll()) and not when you iterate through the returne messages. This also means, that your commits will "jump" forward, like from committed offset 0 to 100 (if you received 100 messages by poll for a single partition).

查看更多
登录 后发表回答