原来的问题
我有一种情况,我有多个IObservable
我希望与结合序列Merge
,然后听。 但是,如果这些中的一个产生的错误,我不希望它崩溃一切为了其他流,以及重新订阅序列(这是一个“永远持久的”序列)。
我这样做是通过附加Retry()
进行合并,即前流:
IEnumerable<IObservable<int>> observables = GetObservables();
observables
.Select(o => o.Retry())
.Merge()
.Subscribe(/* Do subscription stuff */);
然而,当我想测试这个问题就出现了。 我想测试的是,如果其中一个IObservable
S IN observables
产生OnError
,其他的人应该还是可以通过发送他们的价值观和他们应该得到处理
我想我只是用两个Subject<int>
代表中国2 IObservable
以s observables
; 一个发送OnError(new Exception())
和其他的,在此之后,发送OnNext(1)
然而,这似乎Subject<int>
将重播新订阅所有先前值(有效Retry()
是),把测试进入一个无限循环。
我试图通过创建一个手动来解决它IObservable
产生第一签约上,后来空序列错误,但感觉哈克:
var i = 0;
var nErrors = 2;
var testErrorObservableWithOneErrorAndThenCompletion = Observable.Create<int>(o => {
i++;
if (i < nErrors) {
return Observable.Throw<int>(new Exception()).Subscribe(o);
} else {
return Observable.Empty<int>().Subscribe(o);
}
});
我使用的Subject
或想Retry()
以错误的方式? 在这个没有其他的想法? 你将如何解决这种情况呢?
更新
好吧,这里是什么,我想而想大理石图Retry()
一样。
o = message, X = error.
------o---o---X
\
Retry() -> \---o---o---X
\
Retry() -> \...
我的问题是也许更在于我没有好货类使用前测试,因为Subject
要重播我以前所有的错误。
更新2
这是一个测试案例,显示了我的意思大约Subject
重放其值。 我使用正确如果我说这是否在寒冷的方式术语? 我知道Subject
是创造一个热点观察到的一种方式,但还是这种行为感到“冷”给我。
var onNext = false;
var subject = new Subject<int>();
subject.Retry().Subscribe(x => onNext = true);
subject.OnError(new Exception());
subject.OnNext(1);
Assert.That(onNext, Is.True);
根据您的更新的要求(要重试失败,而不是只是想忽略他们的观测),我们可以拿出一个可行的解决方案。
首先,要了解一个冷观察到(重建每个订阅)和热可观测(无论存在订阅)之间的区别是很重要的。 你不能Retry()
热可观的,因为它不知道如何重新创建底层的事件。 也就是说,如果一个热点观察到的错误,它已经一去不复返了。
Subject
创建一个热点观察到,在这你可以拨打感OnNext
,而无需用户,并如预期它会采取行动。 到热可观察转换为冷观察到的,你可以使用Observable.Defer
,其中将包含“上订购创建”的可观察的逻辑。
所有这一切说,这里的修改,以做到这一点原来的代码:
var success = new Subject<int>();
var error = new Subject<int>();
var observables = new List<IObservable<int>> { Observable.Defer(() => {success = new Subject<int>(); return success.AsObservable();}),
Observable.Defer(() => {error = new Subject<int>(); return error.AsObservable();}) };
observables
.Select(o => o.Retry())
.Merge()
.Subscribe(Console.WriteLine, Console.WriteLine, () => Console.WriteLine("done"));
和测试(类似前):
success.OnNext(1);
error.OnError(new Exception("test"));
success.OnNext(2);
error.OnNext(-1);
success.OnCompleted();
error.OnCompleted();
正如预期的输出:
1
2
-1
done
当然,你需要修改这个概念显著这取决于你观察到的基本是。 使用科目的测试是不一样的使用他们的真实。
我也想指出,此评论:
然而,似乎主题将重播以前的所有值一个新的订阅(有效地重试()是),把测试进入一个无限循环。
是不是真的- Subject
不行为这种方式。 还有是基于这样的事实引起无限循环代码的一些其他方面Retry
重新创建订阅和订阅在某些时候产生一个错误。
原来的答复 (完成)
问题是, Retry()
你想要它做的事情没有做。 从这里:
http://msdn.microsoft.com/en-us/library/ff708141(v=vs.92).aspx
重复用于RetryCount重次,直到它成功终止的源可观察序列。
这意味着, Retry
会不断地尝试重新连接到底层观察到,直到成功,并不会引发错误。
我的理解是你真正想要的观察到的异常被忽略 ,而不是重试。 这将做你想要什么,而不是:
observables
.Select(o => o.Catch((Func<Exception,IObservable<int>>)(e => Observable.Empty<int>())))
.Merge()
.Subscribe(/* subscription code */);
它使用Catch
陷阱可观察与异常,并在该点空观测的更换。
下面是使用科目全面测试:
var success = new Subject<int>();
var error = new Subject<int>();
var observables = new List<IObservable<int>> { success.AsObservable(), error.AsObservable() };
observables
.Select(o => o.Catch((Func<Exception,IObservable<int>>)(e => Observable.Empty<int>())))
.Merge()
.Subscribe(Observer.Create<int>(Console.WriteLine, Console.WriteLine, () => Console.WriteLine("done")));
success.OnNext(1);
error.OnError(new Exception("test"));
success.OnNext(2);
success.OnCompleted();
并且这产生,符合市场预期:
1
2
done