Original question
I have a scenario where I have multiple IObservable
sequences which I want to combine with Merge
and then listen to. However, if one of these produces an error I don't want it to crash everything for the other streams, as well as to resubscribe to the sequence (this is an 'ever lasting' sequence).
I do this by appending a Retry()
to the streams before merge, i.e.:
IEnumerable<IObservable<int>> observables = GetObservables();
observables
.Select(o => o.Retry())
.Merge()
.Subscribe(/* Do subscription stuff */);
However, the problem arises when I want to test this. What I would like to test is that if one of the IObservable
s in observables
produces an OnError
, the other ones should still be able to send their values through and they should get handled
I thought I'd just use two Subject<int>
s representing two IObservable
s in observables
; one sending an OnError(new Exception())
and the other, after that, sending OnNext(1)
. However, it seems Subject<int>
will replay all previous values for a new subscription (which effectively Retry()
is), turning the test into an infinite loop.
I tried to solve it by creating a manual IObservable
that produces an error on the first subscription and later an empty sequence, but it feels hacky:
var i = 0;
var nErrors = 2;
var testErrorObservableWithOneErrorAndThenCompletion = Observable.Create<int>(o => {
i++;
if (i < nErrors) {
return Observable.Throw<int>(new Exception()).Subscribe(o);
} else {
return Observable.Empty<int>().Subscribe(o);
}
});
Am I using Subject
or thinking about Retry()
in the wrong way? Any other thoughts on this? How would you solve this situation?
Update
Ok, here's a marble diagram of what I want and think Retry()
does.
o = message, X = error.
------o---o---X
\
Retry() -> \---o---o---X
\
Retry() -> \...
My problem is perhaps more in that I don't have a good stock class to use fore testing, since Subject
wants to replay all of my previous errors.
Update 2
Here's a test case that shows what I mean about Subject
replaying its values. Am I using the term correctly if I say it does this in a cold way? I know Subject
is a way of creating a hot observable, but still this behavior feels 'cold' to me.
var onNext = false;
var subject = new Subject<int>();
subject.Retry().Subscribe(x => onNext = true);
subject.OnError(new Exception());
subject.OnNext(1);
Assert.That(onNext, Is.True);
Based on your updated requirements (you want to retry the observables that fail, rather than just wanting to ignore them), we can come up with a solution that works.
First, it's important to understand the difference between a cold observable (recreated on every subscription) and a hot observable (exists regardless of subscriptions). You can't
Retry()
a hot observable, as it won't know how to recreate the underlying events. That is, if a hot observable errors, it's gone forever.Subject
creates a hot observable, in the sense that you can callOnNext
without having subscribers and it will act as expected. To convert a hot observable to a cold observable, you can useObservable.Defer
, which will contain the 'creation on subscription' logic for that observable.All that said, here's the original code modified to do this:
And the test (similar to before):
And the output as expected:
Of course, you'll need to modify this concept significantly depending on what you're underlying observable is. Using subjects for testing is not the same as using them for real.
I also want to note that this comment:
Is not true -
Subject
doesn't behave this way. There is some other aspect of your code that is causing the infinite loop based on the fact thatRetry
recreates the subscription, and the subscription creates an error at some point.Original answer (for completion)
The issue is that
Retry()
doesn't do what you want it to do. From here:http://msdn.microsoft.com/en-us/library/ff708141(v=vs.92).aspx
This means that
Retry
will continually try and reconnect to the underlying observable until it succeeds and doesn't throw an error.My understanding is that you actually want exceptions in the observable to be ignored, not retried. This will do what you want instead:
This uses
Catch
to trap the observable with an exception, and replace it with an empty observable at that point.Here is a full test using subjects:
And this produces, as expected: