I'm trying to achieve a transaction in a Kafka Processor
to make sure I don't reprocess the same message twice. Given a message (A) I need to create a list of messages that will be produced on another topic in a transaction and i want to commit the original message (A) in the same transaction. From the documentation I found the Producer
method sendOffsetsToTransaction
which seems to be able to commit an offset in a transaction only if it succeeds. This is the code inside the process()
method of my Processor
:
producer.beginTransaction()
val topicPartition = new TopicPartition(this.context().topic(), this.context().partition())
val offsetAndMetadata = new OffsetAndMetadata(this.context().offset())
val map = Map(topicPartition -> offsetAndMetadata).asJava
producer.sendOffsetsToTransaction(map, "consumer-group-id")
items.foreach(x => producer.send(new ProducerRecord("items_topic", x.key, x.value)))
producer.commitTransaction()
throw new RuntimeException("expected exception")
Unfortunatly with this code (that obviously fail on each execution) the processed message (A) is reprocessed each time I re-start the application after the exception.
I manage to make it works adding a +1
to the offset returned by this.context().offset()
and redefining the val offsetAndMetadata
in this way:
val offsetAndMetadata = new OffsetAndMetadata(this.context().offset() + 1)
Is this the normal behaviour or I'm doing something wrong?
Thank you :)
Your code is correct.
The offsets you commit are the offsets of the messages you want to read next (not the offsets of the messages you did read last).
Compare: https://github.com/apache/kafka/blob/41e4e93b5ae8a7d221fce1733e050cb98ac9713c/streams/src/main/java/org/apache/kafka/streams/processor/internals/StreamTask.java#L346
Instead of adding 1 to the offset you can use
This will return the offset of the next record that will be given out. It will be one larger than the highest offset the consumer has seen in that partition