I have an integration flow defined like this:
IntegrationFlows.from(Amqp.inboundAdapter(connectionFactory, "queueName")
.id("id")
.autoStartup(autoStartup)
.concurrentConsumers(2)
.maxConcurrentConsumers(3)
.messageConverter(messageConverter()))
.aggregate(a -> ...)
.handle(serviceActivatorBean, "myMethod", e -> e.advice(requestHandlerRetryAdviceForIntegrationFlow()))
.get();
Where the serviceActivatorBean
is defined like this:
@Component
@Transactional
public class ServiceActivator {
@ServiceActivator
public void myMethod(Collection<MyEvent> events) {
....
}
}
And requestHandlerRetryAdviceForIntegrationFlow()
is defined like this:
public static RequestHandlerRetryAdvice requestHandlerRetryAdviceForIntegrationFlow() {
RequestHandlerRetryAdvice advice = new RequestHandlerRetryAdvice();
RetryTemplate retryTemplate = new RetryTemplate();
SimpleRetryPolicy retryPolicy = new SimpleRetryPolicy();
retryPolicy.setMaxAttempts(MAX_VALUE);
retryTemplate.setRetryPolicy(retryPolicy);
retryTemplate.setListeners(new RetryListenerSupport[]{new RetryListenerSupport() {
@Override
public <T, E extends Throwable> void onError(RetryContext context, RetryCallback<T, E> callback, Throwable throwable) {
log.error("Caught exception {} (retry count {}), will retry again!", throwable.getClass().getSimpleName(),
context.getRetryCount(), throwable);
}
}});
advice.setRetryTemplate(retryTemplate);
ExponentialBackOffPolicy backOffPolicy = new ExponentialBackOffPolicy();
backOffPolicy.setMaxInterval(5000L);
backOffPolicy.setInitialInterval(200L);
backOffPolicy.setMultiplier(2);
retryTemplate.setBackOffPolicy(backOffPolicy);
return advice;
}
The problem we face is when the events
collection in the service activator contains 2 or more events and for some reason the processing of myMethod
fails and the server crash. What seems to happen is that the IntegrationFlow
consumes and acks one message at a time from RabbitMQ, so if the server crash during the processing of myMethod
all but the last event is lost. This is neither good nor safe enough for us. Is there something we can do to configure the IntegrationFlow
to NOT ack any message until myMethod
in the service activator has been completed successfully?