Don't ack RabbitMQ messages until IntegrationF

2019-08-20 09:41发布

问题:

I have an integration flow defined like this:

IntegrationFlows.from(Amqp.inboundAdapter(connectionFactory, "queueName")
                    .id("id")
                    .autoStartup(autoStartup)
                    .concurrentConsumers(2)
                    .maxConcurrentConsumers(3)
                    .messageConverter(messageConverter()))
                    .aggregate(a -> ...)
                    .handle(serviceActivatorBean, "myMethod", e -> e.advice(requestHandlerRetryAdviceForIntegrationFlow()))
                    .get();

Where the serviceActivatorBean is defined like this:

@Component
@Transactional
public class ServiceActivator {

    @ServiceActivator    
    public void myMethod(Collection<MyEvent> events) {
        ....
    }    
}

And requestHandlerRetryAdviceForIntegrationFlow() is defined like this:

  public static RequestHandlerRetryAdvice requestHandlerRetryAdviceForIntegrationFlow() {
    RequestHandlerRetryAdvice advice = new RequestHandlerRetryAdvice();
    RetryTemplate retryTemplate = new RetryTemplate();
    SimpleRetryPolicy retryPolicy = new SimpleRetryPolicy();
    retryPolicy.setMaxAttempts(MAX_VALUE);
    retryTemplate.setRetryPolicy(retryPolicy);
    retryTemplate.setListeners(new RetryListenerSupport[]{new RetryListenerSupport() {
        @Override
        public <T, E extends Throwable> void onError(RetryContext context, RetryCallback<T, E> callback, Throwable throwable) {
            log.error("Caught exception {} (retry count {}), will retry again!", throwable.getClass().getSimpleName(),
                    context.getRetryCount(), throwable);
        }
    }});
    advice.setRetryTemplate(retryTemplate);
    ExponentialBackOffPolicy backOffPolicy = new ExponentialBackOffPolicy();
    backOffPolicy.setMaxInterval(5000L);
    backOffPolicy.setInitialInterval(200L);
    backOffPolicy.setMultiplier(2);
    retryTemplate.setBackOffPolicy(backOffPolicy);
    return advice;
}

The problem we face is when the events collection in the service activator contains 2 or more events and for some reason the processing of myMethod fails and the server crash. What seems to happen is that the IntegrationFlow consumes and acks one message at a time from RabbitMQ, so if the server crash during the processing of myMethod all but the last event is lost. This is neither good nor safe enough for us. Is there something we can do to configure the IntegrationFlow to NOT ack any message until myMethod in the service activator has been completed successfully?

回答1:

You can use Acknowledge Mode MANUAL and confirm via headers afterward:

https://docs.spring.io/spring-integration/docs/4.3.12.RELEASE/reference/html/amqp.html#amqp-inbound-channel-adapter