How to retry a failed call to a Webflux.outboundga

2020-04-17 05:30发布

问题:

I have a spring integration flow defined in the flow DSL syntax. One of my handlers is a Webflux.outboundGateway. When the remote URI is not accessible, an exception is thrown and sent to the "errorChannel". I'm trying to have the flow to retry, but so far with no success (the call is never retried). Here is what my configuration looks like:

@Bean
public IntegrationFlow retriableFlow() {
    return IntegrationFlows
            .from(...)
            .handle(
                    WebFlux.outboundGateway(m ->
                        UriComponentsBuilder.fromUriString(remoteGateway + "/foo/bar")
                                            .build()
                                            .toUri(), webClient)
                    .httpMethod(HttpMethod.POST)
                    .expectedResponseType(String.class)
                    .replyPayloadToFlux(true), e -> e.advice(retryAdvice())
            )
            // [ ... ]
            .get();
}
@Bean
public Advice retryAdvice() {
   RequestHandlerRetryAdvice advice = new RequestHandlerRetryAdvice();
   RetryTemplate retryTemplate = new RetryTemplate();
   ExponentialBackOffPolicy retryPolicy = new ExponentialBackOffPolicy();
   retryPolicy.setInitialInterval(1000);
   retryPolicy.setMaxInterval(20000);
   retryTemplate.setBackOffPolicy(retryPolicy);
   advice.setRetryTemplate(retryTemplate);
   return advice;
}

Should I be using something different than the RequestHandlerRetryAdvice? If so, what should it be?

回答1:

Webflux is, by definition, async, which means the Mono (reply) is satisfied asynchronously when the request completes/fails, not on the calling thread. Hence the advice won't help because the "send" part of the request is always successful.

You would have to perform retries via a flow on the error channel (assigned somewhere near the start of the flow). With, perhaps, some header indicating how many times you have retried.

The ErrorMessage has properties failedMessage and cause; you can resend the failedMessage.

You could turn off async so the calling thread blocks, but that really defeats the whole purpose of using WebFlux.