I have implemented a Kafka consumer with KafkaHandler
. My consumer is supposed to consume events, then send a REST request to some other service for each event. I want to retry only if that REST service is down. Otherwise, I can ignore the failed event.
My container factory is configured as below:
@Bean
public ConcurrentKafkaListenerContainerFactory<String, MyCustomEvent>
kafkaListenerContainerFactory() {
ConcurrentKafkaListenerContainerFactory<String, MyCustomEvent> factory =
new ConcurrentKafkaListenerContainerFactory<>();
factory.setConsumerFactory(consumerFactory());
factory.setStatefulRetry(true);
factory.setRetryTemplate(retryTemplate());
factory.setConcurrency(3);
ContainerProperties containerProperties = factory.getContainerProperties();
containerProperties.setAckOnError(false);
containerProperties.setAckMode(AckMode.RECORD);
containerProperties.setErrorHandler(new SeekToCurrentErrorHandler());
return factory;
}
I am using ExceptionClassifierRetryPolicy
for setting the exceptions and the corresponding retry policies.
Everything looks good with retrying. It retries when I get a ConnectException
and it ignores when I get an IllegalArgumentException
.
However, in IllegalArgumentException
scenario, SeekToCurrentErrorHandler
seeks back to the unprocessed offset (because it seeks back for unprocessed messages including the failed one) which ends up with an immediate retry of the failed message. Consumer constantly goes back and forth and retries million times.
If I had the chance to understand which record has failed in SeekToCurrentErrorHandler
, then I would create a custom implementation of SeekToCurrentErrorHandler
to check if the failed message is retryable or not (by using the thrownException
field). If it is not retryable, then I would remove it from the list of records
to seek back.
Any ideas about how to achieve this functionality?
Note: enable.auto.commit
is set to false
, auto.offset.reset
is set to earliest
.
Thank you!