Apache Kafka: Exactly Once in Version 0.10

2020-07-23 04:34发布

问题:

To achieve exactly-once processing of messages by Kafka consumer I am committing one message at a time, like below

public void commitOneRecordConsumer(long seconds) {
        KafkaConsumer<String, String> consumer = consumerConfigFactory.getConsumerConfig();

        try {

            while (running) {
                ConsumerRecords<String, String> records = consumer.poll(1000);
                try {
                    for (ConsumerRecord<String, String> record : records) {

                        processingService.process(record);

                        consumer.commitSync(Collections.singletonMap(new TopicPartition(record.topic(),record.partition()), new OffsetAndMetadata(record.offset() + 1)));

                        System.out.println("Committed Offset" + ": " + record.offset());

                    }
                } catch (CommitFailedException e) {
                    // application specific failure handling
                }
            }
        } finally {
            consumer.close();
        }
    }

The above code delegates the processing of message asynchronously to another class below.

@Service
public class ProcessingService {

    @Async
    public void process(ConsumerRecord<String, String> record) throws InterruptedException {
        Thread.sleep(5000L);
        Map<String, Object> map = new HashMap<>();
        map.put("partition", record.partition());
        map.put("offset", record.offset());
        map.put("value", record.value());
        System.out.println("Processed" + ": " + map);
    }

}

However, this still does not guarantee exactly-once delivery, because if the processing fails, it might still commit other messages and the previous messages will never be processed and committed, what are my options here?

回答1:

Original answer for 0.10.2 and older releases (for 0.11 and later releases see answer blow)

Currently, Kafka cannot provide exactly-once processing out-of-the box. You can either have at-least-once processing if you commit messages after you successfully processed them, or you can have at-most-once processing if you commit messages directly after poll() before you start processing.

(see also paragraph "Delivery Guarantees" in http://docs.confluent.io/3.0.0/clients/consumer.html#synchronous-commits)

However, at-least-once guarantee is "good enough" if your processing is idempotent, i.e., the final result will be the same even if you process a record twice. Examples for idempotent processing would be adding a message to a key-value store. Even if you add the same record twice, the second insert will just replace the first current key-value-pair and the KV-store will still have the correct data in it.

In your example code above, you update a HashMap and this would be an idempotent operation. Even if your might have an inconsistent state in case of failure if for example only two put calls are executed before the crash. However, this inconsistent state would be fixed on reprocessing the same record again.

The call to println() is not idempotent though because this is an operation with "side effect". But I guess the print is for debugging purpose only.

As an alternative, you would need to implement transaction semantics in your user code which requires to "undo" (partly executed) operation in case of failure. In general, this is a hard problem.

Update for Apache Kafka 0.11+ (for pre 0.11 releases see answer above)

Since 0.11, Apache Kafka supports idempotent producers, transactional producer, and exactly-once-processing using Kafka Streams. It also adds a "read_committed" mode to the consumer to only read committed messages (and to drop/filter aborted messages).

  • https://kafka.apache.org/documentation/#semantics
  • https://www.confluent.io/blog/exactly-once-semantics-are-possible-heres-how-apache-kafka-does-it/
  • https://www.confluent.io/blog/transactions-apache-kafka/
  • https://www.confluent.io/blog/enabling-exactly-kafka-streams/


回答2:

Apache Kafka 0.11.0.0 has been just released, it supports exactly once delivery now.

http://kafka.apache.org/documentation/#upgrade_11_exactly_once_semantics

https://cwiki.apache.org/confluence/display/KAFKA/KIP-98+-+Exactly+Once+Delivery+and+Transactional+Messaging



回答3:

I think exactly once processing can be achieved with kafka 0.10.x itself. But there's some catch. I'm sharing the high level idea from this book. Relevant contents can be found in section: Seek and Exactly Once Processing in chapter 4: Kafka Consumers - Reading Data from Kafka. You can view the contents of that book with a (free) safaribooksonline account, or buy it once it's out, or maybe get it from other sources, which we shall not speak about.

Idea:

Think about this common scenario: Your application reads events from Kafka, processes the data, and then stores the results in a database. Suppose that we really don’t want to lose any data, nor do we want to store the same results in the database twice.

It's doable if there is a way to store both the record and the offset in one atomic action. Either both the record and the offset are committed, or neither of them are committed. To achieve that, we need to write both the record and the offset to the database, in one transaction. Then we’ll know that either we are done with the record and the offset is committed or we are not, and the record will be reprocessed.

Now the only problem is: if the record is stored in a database and not in Kafka, how will our consumer know where to start reading when it is assigned a partition? This is exactly what seek() can be used for. When the consumer starts or when new partitions are assigned, it can look up the offset in the database and seek() to that location.

Sample code from the book:

public class SaveOffsetsOnRebalance implements ConsumerRebalanceListener {
    public void onPartitionsRevoked(Collection<TopicPartition> partitions) {
        commitDBTransaction(); 
    }

    public void onPartitionsAssigned(Collection<TopicPartition> partitions) {
        for(TopicPartition partition: partitions)
        consumer.seek(partition, getOffsetFromDB(partition)); 
    }
}

consumer.subscribe(topics, new SaveOffsetOnRebalance(consumer));
consumer.poll(0);

for (TopicPartition partition: consumer.assignment())
    consumer.seek(partition, getOffsetFromDB(partition));   

while (true) {
    ConsumerRecords<String, String> records = consumer.poll(100);
    for (ConsumerRecord<String, String> record : records)
    {
        processRecord(record);
        storeRecordInDB(record);
        storeOffsetInDB(record.topic(), record.partition(), record.offset()); 
    }
    commitDBTransaction();
}