KafkaProducer sendOffsetsToTransaction need offset

2019-08-21 10:30发布

问题:

I'm trying to achieve a transaction in a Kafka Processor to make sure I don't reprocess the same message twice. Given a message (A) I need to create a list of messages that will be produced on another topic in a transaction and i want to commit the original message (A) in the same transaction. From the documentation I found the Producer method sendOffsetsToTransaction which seems to be able to commit an offset in a transaction only if it succeeds. This is the code inside the process() method of my Processor:

    producer.beginTransaction()
    val topicPartition    = new TopicPartition(this.context().topic(), this.context().partition())
    val offsetAndMetadata = new OffsetAndMetadata(this.context().offset())
    val map               = Map(topicPartition -> offsetAndMetadata).asJava
    producer.sendOffsetsToTransaction(map, "consumer-group-id")
    items.foreach(x => producer.send(new ProducerRecord("items_topic", x.key, x.value)))
    producer.commitTransaction()
    throw new RuntimeException("expected exception")

Unfortunatly with this code (that obviously fail on each execution) the processed message (A) is reprocessed each time I re-start the application after the exception.

I manage to make it works adding a +1 to the offset returned by this.context().offset() and redefining the val offsetAndMetadata in this way:

val offsetAndMetadata = new OffsetAndMetadata(this.context().offset() + 1)

Is this the normal behaviour or I'm doing something wrong?

Thank you :)

回答1:

Your code is correct.

The offsets you commit are the offsets of the messages you want to read next (not the offsets of the messages you did read last).

Compare: https://github.com/apache/kafka/blob/41e4e93b5ae8a7d221fce1733e050cb98ac9713c/streams/src/main/java/org/apache/kafka/streams/processor/internals/StreamTask.java#L346



回答2:

Instead of adding 1 to the offset you can use

 long newOffset = consumer.position(topicPartition);

This will return the offset of the next record that will be given out. It will be one larger than the highest offset the consumer has seen in that partition