I am using Avro and Schema registry with my Spring Kafka setup.
I would like to somehow handle the SerializationException
, which might be thrown during deserialization.
I found the following two resource:
https://github.com/spring-projects/spring-kafka/issues/164
How do I configure spring-kafka to ignore messages in the wrong format?
These resources suggest that I return null instead of throwing an SerializationException
when deserializing and listen for KafkaNull
. This solution works just fine.
I would however like to be able to throw an exception instead of returning null.
KIP-161 and KIP-210 provide better features to handling exceptions. I did find some resources mentioning KIP-161 in Spring Cloud, but nothing specific about Spring-Kafka.
Does anyone know how to catch SerializationException
in Spring Boot?
I am using Spring Boot 2.0.2
Edit: I found a solution.
I would rather throw an exception and catch it than having to return null or KafkaNull
. I am using my custom Avro serializer and deserializer in multiple different project, some of which are not Spring. If I changed my Avro serializer and deserializer then some of the other projects would need to be changed to expect the deserializer to return null.
I would like to shutdown the container, such that I do not lose any messages. The SerializationException should never be expected in production. The SerializationException should only be able to happen if Schema Registry is down or if an unformatted message somehow is sent to the production kafka. Either way, SerializationException should only happen very rarely, and if it happens then I want to shutdown the container such that no messages are lost and I can investigate the issue.
Just take into consideration that will catch all exceptions from your consumer container. In my specific case I just want to only shutdown if it is a SerializationException
public class SerializationExceptionHandler extends ContainerStoppingErrorHandler {
@Override
public void handle(Exception thrownException, List<ConsumerRecord<?, ?>> records, Consumer<?, ?> consumer,
MessageListenerContainer container) {
//Only call super if the exception is SerializationException
if (thrownException instanceof SerializationException) {
//This will shutdown the container.
super.handle(thrownException, records, consumer, container);
} else {
//Wrap and re-throw the exception
throw new KafkaException("Kafka Consumer Container Error", thrownException);
}
}
}
This handler is passed to the consumer container. Below is an example of a
KafkaListenerContainerFactory
bean.
@Bean
public KafkaListenerContainerFactory<ConcurrentMessageListenerContainer<Integer, String>>
kafkaListenerContainerFactory(JpaTransactionManager jpa, KafkaTransactionManager<?, ?> kafka) {
ConcurrentKafkaListenerContainerFactory<Integer, String> factory =
new ConcurrentKafkaListenerContainerFactory<>();
factory.setConsumerFactory(consumerFactory());
factory.setConcurrency(1);
factory.getContainerProperties().setPollTimeout(3000);
factory.getContainerProperties().setErrorHandler(new SerializationExceptionHandler());
factory.getContainerProperties().setTransactionManager(chainedTxM(jpa, kafka));
return factory;
}