Spring Kafka SeekToCurrentErrorHandler maxFailures

2020-03-08 06:54发布

问题:

I am using spring kafka 2.2.7 and my consumer configuration code is following:

@Slf4j
@Configuration
@EnableKafka
public class KafkaConfiguration {

  @Bean
  ConcurrentKafkaListenerContainerFactory<String, Customer> kafkaListenerContainerFactory() {
    ConcurrentKafkaListenerContainerFactory<String, Customer> factory = new ConcurrentKafkaListenerContainerFactory<>();
    factory.setConsumerFactory(consumerFactory());
    // less than number of partition, will do infinite retry
    factory.setConcurrency(1);
    SeekToCurrentErrorHandler errorHandler =
        new SeekToCurrentErrorHandler((record, exception) -> {
          LOGGER.info("***in error handler data, {}", record);
        }, 1);
    factory.setErrorHandler(errorHandler);
    return factory;
  }

  @Bean
  public ConsumerFactory<String, Customer> consumerFactory() {
    return new DefaultKafkaConsumerFactory<>(consumerConfigs());
  }

  @Bean
  public Map<String, Object> consumerConfigs() {
    Map<String, Object> props = new HashMap<>();
    props.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, "localhost:9092");
    props.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class);
    props.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, ErrorHandlingDeserializer2.class);
    props.put(ErrorHandlingDeserializer2.VALUE_DESERIALIZER_CLASS, KafkaAvroDeserializer.class);

    props.put("schema.registry.url", "http://127.0.0.1:8081");
    props.put("specific.avro.reader", "true");

    props.put(ConsumerConfig.GROUP_ID_CONFIG, groupId);
    props.put(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG, "earliest");
    props.put(ConsumerConfig.ENABLE_AUTO_COMMIT_CONFIG, "false");
    return props;
  }
}


@Component
@Slf4j
public class KafkaConsumerService {

  @KafkaListener(id = "demo-consumer-stream-group", topics = "kafka-demo-avro")
  public void process(ConsumerRecord<String, Customer> record) {
    LOGGER.info("Customer key: {} and value: {}", record.key(), record.value());
    LOGGER.info("topic: {}, partition: {}, offset: {}", record.topic(), record.partition(), record.offset());
    throw new RuntimeException("force to retry");
  }
}

So, if I have an exception happening in my listener, the consumer will retry the failed message forever even I config the maxFailures in the configuration if the concurrency level I specified is less than the partition counts for my topic.

It will only work if i send the message one by one at least a second interval. If I send the message as a batch, the behaviour wouldn't work. Unless I restart the consumer and it will work properly.

Step to reproduce: 1. create a topic with more than 1 partition, for example 3 or 6 2. In Spring kafka config, specify concurrency level to 1 3. For SeekToCurrentErrorHandler, specify maxFailure to a positive value for example 3 4. Send dozen of messages to the topic

You will see each failed message will do infinite retry instead of the maxFailure I specified. Also, I can see a lot of messages fall behind of the consumer lag.

However, if you stop the listener and start the listener again, it will skip the failed messages properly.

回答1:

It's a bug for Spring Kafka 2.2.7.RELEASE but fixed in 2.2.8.RELEASE.



回答2:

I'm using 2.2.8 and I can still reproduce the bug, but with a bit more intricate configuration. I need to have both SeekToCurrentEH and retryTemplate with ExponentialRandomBackOffPolicy. So there's still a risk of running into it. Reported here.