Spring Kafka SeekToCurrentErrorHandler Find Out Wh

2020-06-19 07:02发布

问题:

I have implemented a Kafka consumer with KafkaHandler. My consumer is supposed to consume events, then send a REST request to some other service for each event. I want to retry only if that REST service is down. Otherwise, I can ignore the failed event.

My container factory is configured as below:

@Bean
public ConcurrentKafkaListenerContainerFactory<String, MyCustomEvent>
  kafkaListenerContainerFactory() {

  ConcurrentKafkaListenerContainerFactory<String, MyCustomEvent> factory =
    new ConcurrentKafkaListenerContainerFactory<>();

  factory.setConsumerFactory(consumerFactory());
  factory.setStatefulRetry(true);
  factory.setRetryTemplate(retryTemplate());
  factory.setConcurrency(3);

  ContainerProperties containerProperties = factory.getContainerProperties();
  containerProperties.setAckOnError(false);
  containerProperties.setAckMode(AckMode.RECORD);
  containerProperties.setErrorHandler(new SeekToCurrentErrorHandler());

  return factory;
}

I am using ExceptionClassifierRetryPolicy for setting the exceptions and the corresponding retry policies.

Everything looks good with retrying. It retries when I get a ConnectException and it ignores when I get an IllegalArgumentException.

However, in IllegalArgumentException scenario, SeekToCurrentErrorHandler seeks back to the unprocessed offset (because it seeks back for unprocessed messages including the failed one) which ends up with an immediate retry of the failed message. Consumer constantly goes back and forth and retries million times.

If I had the chance to understand which record has failed in SeekToCurrentErrorHandler, then I would create a custom implementation of SeekToCurrentErrorHandler to check if the failed message is retryable or not (by using the thrownException field). If it is not retryable, then I would remove it from the list of records to seek back.

Any ideas about how to achieve this functionality?

Note: enable.auto.commit is set to false, auto.offset.reset is set to earliest.

Thank you!

回答1:

There is a FailedRecordTracker since Spring for Apache Kafka 2.2 (not released yet):

https://docs.spring.io/spring-kafka/docs/2.2.0.M2/reference/html/whats-new-part.html#_listener_container_changes

Starting with version 2.2, the SeekToCurrentErrorHandler can now recover (skip) a record that keeps failing. By default, after 10 failures, the failed record will be logged (ERROR). You can configure the handler with a custom recoverer (BiConsumer) and/or max failures.

SeekToCurrentErrorHandler errorHandler =
    new SeekToCurrentErrorHandler((record, exception) -> {
          // recover after 3 failures - e.g. send to a dead-letter topic
          }, 3);

So, what you need is just copy/paste a FailedRecordTracker and SeekToCurrentErrorHandler source code from the master into your project and you will have a functionality you are seeking for:

https://github.com/spring-projects/spring-kafka/blob/master/spring-kafka/src/main/java/org/springframework/kafka/listener/SeekToCurrentErrorHandler.java

https://github.com/spring-projects/spring-kafka/blob/master/spring-kafka/src/main/java/org/springframework/kafka/listener/SeekToCurrentErrorHandler.java