I've got an integration flow that looks like the following:
@Bean
public IntegrationFlow auditFlow(@Qualifier("eventLoggingConnectionFactory") ConnectionFactory connectionFactory,
@Qualifier("writeChannel") MessageChannel messageChannel,
@Qualifier("parseErrorChannel") MessageChannel errorChannel) {
return IntegrationFlows
.from(Amqp.inboundAdapter(connectionFactory, auditQueue)
.errorChannel(errorChannel)
.concurrentConsumers(numConsumers)
.messageConverter(new MongoMessageConverter())) // converts JSON to org.bson.Document
.enrichHeaders(e -> e.<Document>headerFunction(MongoWriteConfiguration.MONGO_COLLECTION_HEADER_KEY,
o -> getNamespace(o.getPayload())))
.channel(messageChannel)
.get();
}
The message converter can of course err if a malformed message is introduced, raising a MessageConversionException. In that case, I of course don't want the message requeued -- but I also don't want to set the default to NOT requeue rejected message, as I can on do on the AmqpInboundAdapterSpec. What's the right way for me to not requeue messages that err in that manner (and otherwise republish them for debugging purposes)?
More generally, downstream processes in the same flow can err for data that's more semantically malformed -- once again, I don't want to requeue them. I could throw an AmqpRejectAndDontRequeueException
at those times, but then I'm losing separation of concerns, which is half the point of this. What's the right way to act on those exceptions -- perhaps there's a way to translate to AmqpRejectAndDontRequeueException
?
The default error handler in the inbound adapter's
SimpleMessageListenerContainer
(aConditionalRejectingErrorHandler
) does just that (it detects aMessageConversionException
and throws anAmqpRejectAndDontRequeueException
).You can customize a
ConditionalRejectingErrorHandler
by injecting your ownFatalExceptionStrategy
to look for other exception types and handle them the same way.The
DefaultExceptionStrategy
looks like this...It is only invoked if there's not already an
AmqpRejectAndDontRequeueException
in the cause chain.