SeekToCurrentErrorHandler: DeadLetterPublishingRec

2020-07-26 11:09发布

问题:

I am trying to write kafka consumer using spring-kafka version 2.3.0.M2 library. To handle run time errors I am using SeekToCurrentErrorHandler.class with DeadLetterPublishingRecoverer as my recoverer. This works fine only when my consumer code throws exception, but fails when unable to deserialize the message.

I tried implementing ErrorHandler myself and I was successful but with this approach I myself end up writing DLT code to handle error messages which I do not want to do.

Below are my kafka properties

spring:
   kafka:
     consumer:
        bootstrap-servers: localhost:9092
        group-id: group_id
        auto-offset-reset: latest
        key-deserializer: org.springframework.kafka.support.serializer.ErrorHandlingDeserializer2
        value-deserializer: org.springframework.kafka.support.serializer.ErrorHandlingDeserializer2
        properties:
          spring.json.trusted.packages: com.mypackage
          spring.deserializer.key.delegate.class: org.apache.kafka.common.serialization.StringDeserializer
          spring.deserializer.value.delegate.class: org.apache.kafka.common.serialization.StringDeserializer
public ConcurrentKafkaListenerContainerFactory kafkaListenerContainerFactory(
        ConcurrentKafkaListenerContainerFactoryConfigurer configurer,
        ConsumerFactory<Object, Object> kafkaConsumerFactory,
        KafkaTemplate<Object, Object> template) {
      ConcurrentKafkaListenerContainerFactory<Object, Object> factory = new ConcurrentKafkaListenerContainerFactory<>();
      configurer.configure(factory, kafkaConsumerFactory);
    factory.setErrorHandler(new SeekToCurrentErrorHandler(new DeadLetterPublishingRecoverer(template), maxFailures));}

回答1:

It works fine for me (note that Boot will auto-configure the error handler)...

@SpringBootApplication
public class So56728833Application {

    public static void main(String[] args) {
        SpringApplication.run(So56728833Application.class, args);
    }

    @Bean
    public SeekToCurrentErrorHandler errorHandler(KafkaTemplate<String, String> template) {
        SeekToCurrentErrorHandler eh = new SeekToCurrentErrorHandler(new DeadLetterPublishingRecoverer(template), 3);
        eh.setClassifier( // retry for all except deserialization exceptions
                new BinaryExceptionClassifier(Collections.singletonList(DeserializationException.class), false));
        return eh;
    }

    @KafkaListener(id = "so56728833"
            + "", topics = "so56728833")
    public void listen(Foo in) {
        System.out.println(in);
        if (in.getBar().equals("baz")) {
            throw new IllegalStateException("Test retries");
        }
    }

    @KafkaListener(id = "so56728833dlt", topics = "so56728833.DLT")
    public void listenDLT(Object in) {
        System.out.println("Received from DLT: " + (in instanceof byte[] ? new String((byte[]) in) : in));
    }

    @Bean
    public NewTopic topic() {
        return TopicBuilder.name("so56728833").partitions(1).replicas(1).build();
    }

    @Bean
    public NewTopic dlt() {
        return TopicBuilder.name("so56728833.DLT").partitions(1).replicas(1).build();
    }

    public static class Foo {

        private String bar;

        public Foo() {
            super();
        }

        public Foo(String bar) {
            this.bar = bar;
        }

        public String getBar() {
            return this.bar;
        }

        public void setBar(String bar) {
            this.bar = bar;
        }

        @Override
        public String toString() {
            return "Foo [bar=" + this.bar + "]";
        }

    }

}
spring:
  kafka:
    consumer:
      auto-offset-reset: earliest
      enable-auto-commit: false
      key-deserializer: org.springframework.kafka.support.serializer.ErrorHandlingDeserializer2
      value-deserializer: org.springframework.kafka.support.serializer.ErrorHandlingDeserializer2
      properties:
        spring.json.trusted.packages: com.example
        spring.deserializer.key.delegate.class: org.springframework.kafka.support.serializer.JsonDeserializer
        spring.deserializer.value.delegate.class: org.springframework.kafka.support.serializer.JsonDeserializer
        spring.json.value.default.type: com.example.So56728833Application$Foo
    producer:
      key-serializer: org.springframework.kafka.support.serializer.JsonSerializer
      value-serializer: org.springframework.kafka.support.serializer.JsonSerializer

logging:
  level:
    org.springframework.kafka: trace

I have 3 records in the topic:

"badJSON"
"{\"bar\":\"baz\"}"
"{\"bar\":\"qux\"}"

I see the first one going directly to the DLT, and the second one goes there after 3 attempts.