KafkaProducer sendOffsetsToTransaction need offset

2019-08-21 09:54发布

I'm trying to achieve a transaction in a Kafka Processor to make sure I don't reprocess the same message twice. Given a message (A) I need to create a list of messages that will be produced on another topic in a transaction and i want to commit the original message (A) in the same transaction. From the documentation I found the Producer method sendOffsetsToTransaction which seems to be able to commit an offset in a transaction only if it succeeds. This is the code inside the process() method of my Processor:

    producer.beginTransaction()
    val topicPartition    = new TopicPartition(this.context().topic(), this.context().partition())
    val offsetAndMetadata = new OffsetAndMetadata(this.context().offset())
    val map               = Map(topicPartition -> offsetAndMetadata).asJava
    producer.sendOffsetsToTransaction(map, "consumer-group-id")
    items.foreach(x => producer.send(new ProducerRecord("items_topic", x.key, x.value)))
    producer.commitTransaction()
    throw new RuntimeException("expected exception")

Unfortunatly with this code (that obviously fail on each execution) the processed message (A) is reprocessed each time I re-start the application after the exception.

I manage to make it works adding a +1 to the offset returned by this.context().offset() and redefining the val offsetAndMetadata in this way:

val offsetAndMetadata = new OffsetAndMetadata(this.context().offset() + 1)

Is this the normal behaviour or I'm doing something wrong?

Thank you :)

2条回答
相关推荐>>
2楼-- · 2019-08-21 10:18

Your code is correct.

The offsets you commit are the offsets of the messages you want to read next (not the offsets of the messages you did read last).

Compare: https://github.com/apache/kafka/blob/41e4e93b5ae8a7d221fce1733e050cb98ac9713c/streams/src/main/java/org/apache/kafka/streams/processor/internals/StreamTask.java#L346

查看更多
迷人小祖宗
3楼-- · 2019-08-21 10:35

Instead of adding 1 to the offset you can use

 long newOffset = consumer.position(topicPartition);

This will return the offset of the next record that will be given out. It will be one larger than the highest offset the consumer has seen in that partition

查看更多
登录 后发表回答