Manual Acknowledgement of Messages : Spring Cloud

2019-04-12 23:35发布

The scenario i want to implement is consume a message from Kafka , process it, if some condition fails i do not wish to acknowledge the message. For this i found in the spring cloud stream reference documentation,

autoCommitOffset Whether to autocommit offsets when a message has been processed. If set to false, an Acknowledgment header will be available in the message headers for late acknowledgment.

Default: true.

My question is after setting autoCommitOffset to false, how can i acknowledge a message? A Code example would be hugely appreciated.

1条回答
对你真心纯属浪费
2楼-- · 2019-04-13 00:10

I've provided an answer to the question here https://github.com/spring-cloud/spring-cloud-stream/issues/575

Essentially it comes down to setting spring.cloud.stream.kafka.bindings.input.consumer.autoCommitOffset=false

and then handling the acknowledgment header:

@SpringBootApplication
@EnableBinding(Sink.class)
   public class ManuallyAcknowdledgingConsumer {

      public static void main(String[] args) {
         SpringApplication.run(ManuallyAcknowdledgingConsumer.class, args);
      }

      @StreamListener(Sink.INPUT)
      public void process(Message<?> message) {
         System.out.println(message.getPayload());
         Acknowledgment acknowledgment = message.getHeaders().get(KafkaHeaders.ACKNOWLEDGMENT, Acknowledgment.class);
        if (acknowledgment != null) {
           System.out.println("Acknowledgment provided");
           acknowledgment.acknowledge();
        }
    }
}
查看更多
登录 后发表回答