Spring Integration Java DSL using JMS retry/redliv

2019-08-23 11:02发布

问题:

How can I effectively support JMS redelivery when msg handling throws an exception?

I have a flow using JMS (ActiveMQ) with a connectionFactory that is configured to allow n redelivery attempts.

I would like to have any error that occurs while handling the msg cause the msg to get put back for redelivery as many times as the connectionFactory config allows and then when max redelivery attempts are exhausted, deliver to DLQ. per usual with AMQ.

An answer to a related SO question implies that I could have an errorChannel that re-throws which should trigger redelivery: Spring Integration DSL ErrorHandling

But, with the following that isnt happening:

/***
 * Dispatch msgs from JMS queue to a handler using a rate-limit
 * @param connectionFactory
 * @return
 */
@Bean
public IntegrationFlow flow2(@Qualifier("spring-int-connection-factory") ConnectionFactory connectionFactory) {

    IntegrationFlow flow =  IntegrationFlows.from(
            Jms.inboundAdapter(connectionFactory)
                    .configureJmsTemplate(t -> t.receiveTimeout(1000))
                    .destination(INPUT_DIRECT_QUEUE),
            e -> e.poller(Pollers
                    .fixedDelay(5000)
                    .errorChannel("customErrorChannel")
                    //.errorHandler(this.msgHandler)
                    .maxMessagesPerPoll(2))
    ).handle(this.msgHandler).get();

    return flow;
}

@Bean
public MessageChannel customErrorChannel() {
    return MessageChannels.direct("customErrorChannel").get();
}

@Bean
public IntegrationFlow customErrorFlow() {
    return IntegrationFlows.from(customErrorChannel())
            .handle ("simpleMessageHandler","handleError")
            .get();
}

The errorChannel method impl:

   public void handleError(Throwable t) throws Throwable {
        log.warn("got error from customErrorChannel");
        throw t;
    }

When an exception is thrown from the handler in flow2, the errorChannel does get the exception but then the re-throw causes a MessageHandlingException:

2018-08-13 09:00:34.221  WARN 98425 --- [ask-scheduler-5] c.v.m.i.jms.SimpleMessageHandler         : got error from customErrorChannel
2018-08-13 09:00:34.224  WARN 98425 --- [ask-scheduler-5] o.s.i.c.MessagePublishingErrorHandler    : Error message was not delivered.

org.springframework.messaging.MessageHandlingException: nested exception is org.springframework.messaging.MessageHandlingException: error occurred in message handler [simpleMessageHandler]; nested exception is java.lang.IllegalArgumentException: dont want first try, failedMessage=GenericMessage [payload=Enter some text here for the message body..., headers={jms_redelivered=false, jms_destination=queue://_dev.directQueue, jms_correlationId=, jms_type=, id=c2dbffc8-8ab0-486f-f2e5-e8d613d62b6a, priority=0, jms_timestamp=1534176031021, jms_messageId=ID:che2-39670-1533047293479-4:9:1:1:8, timestamp=1534176034205}]
    at org.springframework.integration.handler.MethodInvokingMessageProcessor.processMessage(MethodInvokingMessageProcessor.java:107) ~[spring-integration-core-5.0.7.RELEASE.jar:5.0.7.RELEASE]
    at org.springframework.integration.handler.BeanNameMessageProcessor.processMessage(BeanNameMessageProcessor.java:61) ~[spring-integration-core-5.0.7.RELEASE.jar:5.0.7.RELEASE]
    at org.springframework.integration.handler.ServiceActivatingHandler.handleRequestMessage(ServiceActivatingHandler.java:93) ~[spring-integration-core-5.0.7.RELEASE.jar:5.0.7.RELEASE]

回答1:

It would work with a message-driven channel adapter but I presume that's not what you want because of this question.

Since the polled adapter uses a JmsTemplate.receive() operation, the message is already ack'd by the time the flow is called.

You need to use a transactional poller with a JmsTransactionManager so that the exception thrown by the error flow rolls back the transaction and the message will be redelivered.