Observable.Retry doesn't work as expected

2019-06-01 05:40发布

I have a sequence of numbers that are processed using an async method. I'm simulating a remote service call that may fail. In case of failure, I would like to retry until the call successes.

The problem is that with the code I'm trying, every time an exception is thrown in the async method, the sequence seems to hang forever.

You can test it with this simple code snippet (it's tested in LINQPad)

Random rnd = new Random();

void Main()
{
    var numbers = Enumerable.Range(1, 10).ToObservable();
    var processed = numbers.SelectMany(n => Process(n).ToObservable().Retry());
    processed.Subscribe( f => Console.WriteLine(f));
}

public async Task<int> Process(int n)
{
    if (rnd.Next(2) == 1)
    {
        throw new InvalidOperationException();
    }

    await Task.Delay(2000);
    return n*10;    
}

It should process every element, retrying the ones that have failed. Instead, it never ends and I don't know why.

How can I make it to do what I want?

EDIT: (thanks @CharlesNRice and @JonSkeet for the clues!):

This works!

Random rnd = new Random();

void Main()
{
    var numbers = Enumerable.Range(1, 10).ToObservable();
    var processed = numbers.SelectMany(n => RetryTask(() => MyTask(n)).ToObservable());
    processed.Subscribe(f => Console.WriteLine(f));
}

private async Task<int> MyTask(int n)
{
    if (rnd.Next(2) == 1)
    {
        throw new InvalidOperationException();
    }

    await System.Threading.Tasks.Task.Delay(2000);
    return n * 10;
}

async Task<T> RetryTask<T>(Func<Task<T>> myTask, int? retryCount = null)
{
    while (true)
    {
        try
        {
            return await myTask();
        }
        catch (Exception)
        {
            Debug.WriteLine("Retrying...");


            if (retryCount.HasValue)
            {
                if (retryCount == 0)
                {
                    throw;
                }

                retryCount--;
            }
        }
    }
}

2条回答
干净又极端
2楼-- · 2019-06-01 06:23

You are retrying back to the same Task that is in a faulted state. Retry will resubscribe back to the observable source. The source of your retry is the ToObservable(). It will not act like a task factory and make a new Task and since the task is faulted it continues to retry on the faulted task and will never be successful.

You can check out this answer how to make your own retry wrapper https://stackoverflow.com/a/6090049/1798889

查看更多
叼着烟拽天下
3楼-- · 2019-06-01 06:36

Rolling your own Retry is overkill in this case. You can achieve the same thing by simply wrapping your method call in a Defer block and it will be re-executed when the retry occurs.

var numbers = Enumerable.Range(1, 10).ToObservable();

var processed = numbers.SelectMany(n => 
  //Defer call passed method every time it is subscribed to,
  //Allowing the Retry to work correctly.
  Observable.Defer(() => 
    Process(n).ToObservable()).Retry()
);

processed.Subscribe( f => Console.WriteLine(f));
查看更多
登录 后发表回答