Reactive Extensions and Retry

2019-06-12 12:06发布

So a series of articles popped on my radar this morning. It started with this question, which lead to the original example and source code on GitHub.

I rewrote it slightly, so I can start using it in Console and Service applications:

public static class Extensions
{
    static readonly TaskPoolScheduler Scheduler = new TaskPoolScheduler(new TaskFactory());

    // Licensed under the MIT license with <3 by GitHub

    /// <summary>
    /// An exponential back off strategy which starts with 1 second and then 4, 8, 16...
    /// </summary>
    [SuppressMessage("Microsoft.Security", "CA2104:DoNotDeclareReadOnlyMutableReferenceTypes")]
    public static readonly Func<int, TimeSpan> ExponentialBackoff = n => TimeSpan.FromSeconds(Math.Pow(n, 2));

    /// <summary>
    /// A linear strategy which starts with 1 second and then 2, 3, 4...
    /// </summary>
    [SuppressMessage("Microsoft.Security", "CA2104:DoNotDeclareReadOnlyMutableReferenceTypes")]
    public static readonly Func<int, TimeSpan> LinearStrategy = n => TimeSpan.FromSeconds(1*n);

    /// <summary>
    /// Returns a cold observable which retries (re-subscribes to) the source observable on error up to the 
    /// specified number of times or until it successfully terminates. Allows for customizable back off strategy.
    /// </summary>
    /// <param name="source">The source observable.</param>
    /// <param name="retryCount">The number of attempts of running the source observable before failing.</param>
    /// <param name="strategy">The strategy to use in backing off, exponential by default.</param>
    /// <param name="retryOnError">A predicate determining for which exceptions to retry. Defaults to all</param>
    /// <param name="scheduler">The scheduler.</param>
    /// <returns>
    /// A cold observable which retries (re-subscribes to) the source observable on error up to the 
    /// specified number of times or until it successfully terminates.
    /// </returns>
    [SuppressMessage("Microsoft.Reliability", "CA2000:Dispose objects before losing scope")]
    public static IObservable<T> RetryWithBackoffStrategy<T>(
        this IObservable<T> source,
        int retryCount = 3,
        Func<int, TimeSpan> strategy = null,
        Func<Exception, bool> retryOnError = null,
        IScheduler scheduler = null)
    {
        strategy = strategy ?? ExponentialBackoff;
        scheduler = scheduler ?? Scheduler;

        if (retryOnError == null)
            retryOnError = e => true;

        int attempt = 0;

        return Observable.Defer(() =>
        {
            return ((++attempt == 1) ? source : source.DelaySubscription(strategy(attempt - 1), scheduler))
                .Select(item => new Tuple<bool, T, Exception>(true, item, null))
                .Catch<Tuple<bool, T, Exception>, Exception>(e => retryOnError(e)
                    ? Observable.Throw<Tuple<bool, T, Exception>>(e)
                    : Observable.Return(new Tuple<bool, T, Exception>(false, default(T), e)));
        })
        .Retry(retryCount)
        .SelectMany(t => t.Item1
            ? Observable.Return(t.Item2)
            : Observable.Throw<T>(t.Item3));
    }
}

Now to test how it works, I've written this small program:

class Program
{
    static void Main(string[] args)
    {
        int tryCount = 0;
        var cts = new CancellationTokenSource();

        var sched = new TaskPoolScheduler(new TaskFactory());
        var source = Observable.Defer(
            () =>
            {
                Console.WriteLine("Action {0}", tryCount);
                var a = 5/tryCount++;
                return Observable.Return("yolo");
            });

        source.RetryWithBackoffStrategy(scheduler: sched, strategy: Extensions.LinearStrategy, retryOnError: exception => exception is DivideByZeroException);

        while (!cts.IsCancellationRequested)
            source.Subscribe(
                res => { Console.WriteLine("Result: {0}", res); },
                ex =>
                {
                    Console.WriteLine("Error: {0}", ex.Message); 

                },
                () =>
                {
                    cts.Cancel();
                    Console.WriteLine("End Processing after {0} attempts", tryCount);
                });
    }
}

Initially I have thought, that the event of subscription, will automatically trigger all the subsequent retires. That was not the case, so I had to implement a Cancellation Token and loop until it signals that all reties have been exhausted.

The other option is to use AutoResetEvent:

class Program
{
    static void Main(string[] args)
    {
        int tryCount = 0;
        var auto = new AutoResetEvent(false);

        var source = Observable.Defer(
            () =>
            {
                Console.WriteLine("Action {0}", tryCount);
                var a = 5/tryCount++;
                return Observable.Return("yolo");
            });

        source.RetryWithBackoffStrategy(strategy: Extensions.LinearStrategy, retryOnError: exception => exception is DivideByZeroException);

        while (!auto.WaitOne(1))
        {
            source.Subscribe(
                res => { Console.WriteLine("Result: {0}", res); },
                ex =>
                {
                    Console.WriteLine("Error: {0}", ex.Message);
                },
                () =>
                {
                    Console.WriteLine("End Processing after {0} attempts", tryCount);
                    auto.Set();
                });
        }
    }
}

In both scenarios it will display these lines:

Action 0
Error: Attempted to divide by zero.
Action 1
Result: yolo
End Processing after 2 attempts

The question I have to this crowd is: Is this the best way to use this extension? Or is there a way to subscribe to the Observable so it will re-fire itself, up to the number of retries?

FINAL UPDATE

Based on Brandon's suggestion, this is the proper way of subscribing:

internal class Program
{
    #region Methods

    private static void Main(string[] args)
    {
        int tryCount = 0;
        IObservable<string> source = Observable.Defer(
            () =>
            {
                Console.WriteLine("Action {0}", tryCount);
                int a = 5 / tryCount++;
                return Observable.Return("yolo");
            });
        source.RetryWithBackoffStrategy(strategy: Extensions.ExponentialBackoff, retryOnError: exception => exception is DivideByZeroException, scheduler: Scheduler.Immediate)
            .Subscribe(
                res => { Console.WriteLine("Result: {0}", res); },
                ex => { Console.WriteLine("Error: {0}", ex.Message); },
                () =>
                {
                    Console.WriteLine("End Processing after {0} attempts", tryCount);
                });
    }

    #endregion
}

The output will be slightly different:

Action 0
Action 1
Result: yolo
End Processing after 2 attempts

This turned out to be quite useful extension. Here is another example how it can be used, where strategy and error processing is given using delegates.

internal class Program
{
    #region Methods

    private static void Main(string[] args)
    {
        int tryCount = 0;
        IObservable<string> source = Observable.Defer(
            () =>
            {
                Console.WriteLine("Action {0}", tryCount);
                int a = 5 / tryCount++;
                return Observable.Return("yolo");
            });
        source.RetryWithBackoffStrategy(
            strategy: i => TimeSpan.FromMilliseconds(1),
            retryOnError: exception =>
            {
                if (exception is DivideByZeroException)
                {
                    Console.WriteLine("Tried to divide by zero");
                    return true;
                }
                return false;
            },
            scheduler: Scheduler.Immediate).Subscribe(
                res => { Console.WriteLine("Result: {0}", res); },
                ex => { Console.WriteLine("Error: {0}", ex.Message); },
                () =>
                {
                    Console.WriteLine("Succeeded after {0} attempts", tryCount);
                });
    }

    #endregion
}

Output:

Action 0
Tried to divide by zero
Action 1
Result: yolo
Succeeded after 2 attempts

1条回答
叛逆
2楼-- · 2019-06-12 13:11

Yeah Rx is generally asynchronous so when writing tests, you need to wait for it to finish (otherwise Main just exits right after your call to Subscribe).

Also, make sure you subscribe to the observable produced by calling source.RetryWithBackoffStrategy(...). That produces a new observable that has the retry semantics.

Easiest solution in cases like this is to literally use Wait:

try
{
  var source2 = source.RetryWithBackoffStrategy(/*...*/);

  // blocks the current thread until the source finishes
  var result = source2.Wait(); 
  Console.WriteLine("result=" + result);
}
catch (Exception err)
{
  Console.WriteLine("uh oh", err);
}

If you use something like NUnit (which supports asynchronous tests) to write your tests, then you can do:

[Test]
public async Task MyTest()
{
    var source = // ...;
    var source2 = source.RetryWithBackoffStrategy(/*...*/);
    var result = await source2; // you can await observables
    Assert.That(result, Is.EqualTo(5));
}
查看更多
登录 后发表回答