How to make fallback for circuit breaker invoked o

2019-08-26 17:02发布

问题:

I have the following policies:

var sharedBulkhead = Policy.BulkheadAsync(
            maxParallelization: maxParallelizations, 
            maxQueuingActions: maxQueuingActions,
            onBulkheadRejectedAsync: (context) =>
            {
                Log.Info($"Bulk head rejected => Policy Wrap: {context.PolicyWrapKey}, Policy: {context.PolicyKey}, Endpoint: {context.OperationKey}");
                return TaskHelper.EmptyTask;
            }
        );

var retryPolicy = Policy.Handle<HttpRequestException>()
.Or<BrokenCircuitException>().WaitAndRetryAsync(
            retryCount: maxRetryCount,
            sleepDurationProvider: attempt => TimeSpan.FromSeconds(Math.Pow(2, attempt)),
            onRetryAsync: (exception, calculatedWaitDuration, retryCount, context) =>
            {
                Log.Error($"Retry => Count: {retryCount}, Wait duration: {calculatedWaitDuration}, Policy Wrap: {context.PolicyWrapKey}, Policy: {context.PolicyKey}, Endpoint: {context.OperationKey}, Exception: {exception}.");
                return TaskHelper.EmptyTask;
            });

            var circuitBreaker = Policy.Handle<Exception>(e => (e is HttpRequestException)).CircuitBreakerAsync(
            exceptionsAllowedBeforeBreaking: maxExceptionsBeforeBreaking, 
            durationOfBreak: TimeSpan.FromSeconds(circuitBreakDurationSeconds), 
            onBreak: (exception, timespan, context) =>
            {
                Log.Error($"Circuit broken => Policy Wrap: {context.PolicyWrapKey}, Policy: {context.PolicyKey}, Endpoint: {context.OperationKey}, Exception: {exception}");
            },
            onReset: (context) =>
            {
                Log.Info($"Circuit reset => Policy Wrap: {context.PolicyWrapKey}, Policy: {context.PolicyKey}, Endpoint: {context.OperationKey}");
            }
        );

var fallbackForCircuitBreaker = Policy<bool>
         .Handle<BrokenCircuitException>()
         .FallbackAsync(
             fallbackValue: false,
             onFallbackAsync: (b, context) =>
             {
                 Log.Error($"Operation attempted on broken circuit => Policy Wrap: {context.PolicyWrapKey}, Policy: {context.PolicyKey}, Endpoint: {context.OperationKey}");
                 return TaskHelper.EmptyTask;
             }
         );

var fallbackForAnyException = Policy<bool>
            .Handle<Exception>()
            .FallbackAsync(
                fallbackAction: (ct, context) => { return Task.FromResult(false); },
                onFallbackAsync: (e, context) =>
                {
                    Log.Error($"An unexpected error occured => Policy Wrap: {context.PolicyWrapKey}, Policy: {context.PolicyKey}, Endpoint: {context.OperationKey}");
                    return TaskHelper.EmptyTask;
                }
            );


var resilienceStrategy = Policy.WrapAsync(retryPolicy, circuitBreaker, sharedBulkhead);
        var policyWrap = fallbackForAnyException.WrapAsync(fallbackForCircuitBreaker.WrapAsync(resilienceStrategy));

Now, fallbackForCircuitBreaker is only invoked if all retries fail, and if the last retry fails with BrokenCircuitException. What changes should be made in order for fallbackForCircuitBreaker to be invoked every time a retry is made on a broken circuit?

Also, I am using a sharedBulkHead which is an instance field in the service and is initialized in the constructor. Is that a good practise? What is to be done ideally on onBulkheadRejectedAsync? Can I modify the retry policy to handle bulk head rejection as well?

回答1:

Now, fallbackForCircuitBreaker is only invoked if all retries fail, and if the last retry fails with BrokenCircuitException. What changes should be made in order for fallbackForCircuitBreaker to be invoked every time a retry is made on a broken circuit?

See the PolicyWrap documentation, especially the diagrams and description of operation. Policies in a PolicyWrap act like a sequence of nested middleware around the call:

  • the outermost (leftmost in reading order) policy executes the next inner, which executes the next inner, etc; until the innermost policy executes the user delegate;
  • a thrown exception bubbles back outwards (until handled) through the layers

So, to make (an equivalent to) fallbackForCircuitBreaker invoked per try, move it inside the retry policy.

The current fallbackForCircuitBreaker however substitutes the thrown exception with a return value false, whereas it sounds like what you are seeking from fallback-per-try is a 'log, then make the next try'. The technique for that is to use fallback as log then rethrow, so that your retry policy can still respond to the (rethrown) exception. So:

var sharedBulkhead = /* as before */;
var retryPolicy = /* as before */;
var fallbackForCircuitBreaker = /* as before */;
var logCircuitBreakerBrokenPerTry = Policy<bool>
     .Handle<BrokenCircuitException>()
     .FallbackAsync(
         fallbackValue: false,
         onFallbackAsync: (outcome, context) =>
         {
             Log.Error($"Operation attempted on broken circuit => Policy Wrap: {context.PolicyWrapKey}, Policy: {context.PolicyKey}, Endpoint: {context.OperationKey}");
             throw outcome.Exception;
         }
     );
var fallbackForAnyException = /* as before */;

var resilienceStrategy = Policy.WrapAsync(retryPolicy, logCircuitBreakerBrokenPerTry, circuitBreaker, sharedBulkhead);
var policyWrap = fallbackForAnyException.WrapAsync(fallbackForCircuitBreaker.WrapAsync(resilienceStrategy));

Can I modify the retry policy to handle bulk head rejection as well?

The Polly bulkhead documentation states the policy throws BulkheadRejectedException, so:

var retryPolicy = Policy
    .Handle<HttpRequestException>()
    .Or<BrokenCircuitException>()
    .Or<BulkheadRejectedException>()
    /* etc */

What is to be done ideally on onBulkheadRejectedAsync?

You can log. Broadly speaking, you can shed the excess load, or use the bulkhead rejection as a trigger to scale horizontally to increase capacity. The Polly documentation provides more discussion here and here.

Also, I am using a sharedBulkHead which is an instance field in the service and is initialized in the constructor. Is that a good practise?

It depends. The lifetime of the Bulkhead policy instance must be long-lived across the governed calls, not per call, in order for the state of the Bulkhead policy instance to govern the number of calls executing concurrently.

  • If the service exists as a long-lived singleton, holding the bulkhead policy in an instance field would be appropriate as the bulkhead policy instance would also be long lived.
  • If instances of the service class are created as transient/per-request by a DI container, you would need to ensure the bulkhead policy instance was still long-lived and shared across concurrent requests (eg by making it static), not per-request.
  • If instances of the bulkhead are configured on an HttpClient by HttpClientFactory, follow the notes on scoping stateful policies with HttpClientFactory in the Polly and HttpClientFactory documentation.


标签: c# asp.net polly