非重放观察到热(Non replaying hot observable)

2019-07-29 19:38发布

原来的问题

我有一种情况,我有多个IObservable我希望与结合序列Merge ,然后听。 但是,如果这些中的一个产生的错误,我不希望它崩溃一切为了其他流,以及重新订阅序列(这是一个“永远持久的”序列)。

我这样做是通过附加Retry()进行合并,即前流:

IEnumerable<IObservable<int>> observables = GetObservables();

observables
    .Select(o => o.Retry())
    .Merge()
    .Subscribe(/* Do subscription stuff */);

然而,当我想测试这个问题就出现了。 我想测试的是,如果其中一个IObservable S IN observables产生OnError ,其他的人应该还是可以通过发送他们的价值观和他们应该得到处理

我想我只是用两个Subject<int>代表中国2 IObservable以s observables ; 一个发送OnError(new Exception())和其他的,在此之后,发送OnNext(1) 然而,这似乎Subject<int>将重播新订阅所有先前值(有效Retry()是),把测试进入一个无限循环。

我试图通过创建一个手动来解决它IObservable产生第一签约上,后来空序列错误,但感觉哈克:

var i = 0;
var nErrors = 2;
var testErrorObservableWithOneErrorAndThenCompletion = Observable.Create<int>(o => {
    i++;
    if (i < nErrors) {
        return Observable.Throw<int>(new Exception()).Subscribe(o);
    } else {
        return Observable.Empty<int>().Subscribe(o);
    }
});

我使用的Subject或想Retry()以错误的方式? 在这个没有其他的想法? 你将如何解决这种情况呢?

更新

好吧,这里是什么,我想而大理石图Retry()一样。

o = message, X = error.
------o---o---X
               \
     Retry() -> \---o---o---X
                             \
                   Retry() -> \...

我的问题是也许更在于我没有好货类使用前测试,因为Subject要重播我以前所有的错误。

更新2

这是一个测试案例,显示了我的意思大约Subject重放其值。 我使用正确如果我说这是否在寒冷的方式术语? 我知道Subject是创造一个热点观察到的一种方式,但还是这种行为感到“冷”给我。

var onNext = false;
var subject = new Subject<int>();

subject.Retry().Subscribe(x => onNext = true);
subject.OnError(new Exception());
subject.OnNext(1);

Assert.That(onNext, Is.True);

Answer 1:

根据您的更新的要求(要重试失败,而不是只是想忽略他们的观测),我们可以拿出一个可行的解决方案。

首先,要了解一个冷观察到(重建每个订阅)和热可观测(无论存在订阅)之间的区别是很重要的。 你不能Retry()热可观的,因为它不知道如何重新创建底层的事件。 也就是说,如果一个热点观察到的错误,它已经一去不复返了。

Subject创建一个热点观察到,在这你可以拨打感OnNext ,而无需用户,并如预期它会采取行动。 到热可观察转换为冷观察到的,你可以使用Observable.Defer ,其中将包含“上订购创建”的可观察的逻辑。

所有这一切说,这里的修改,以做到这一点原来的代码:

var success = new Subject<int>();
var error = new Subject<int>();

var observables = new List<IObservable<int>> { Observable.Defer(() => {success = new Subject<int>(); return success.AsObservable();}), 
                                               Observable.Defer(() => {error = new Subject<int>(); return error.AsObservable();}) };                                            

observables
.Select(o => o.Retry())
.Merge()
.Subscribe(Console.WriteLine, Console.WriteLine, () => Console.WriteLine("done"));

和测试(类似前):

success.OnNext(1);
error.OnError(new Exception("test"));
success.OnNext(2);
error.OnNext(-1);
success.OnCompleted();
error.OnCompleted();

正如预期的输出:

1
2
-1
done

当然,你需要修改这个概念显著这取决于你观察到的基本是。 使用科目的测试是不一样的使用他们的真实。

我也想指出,此评论:

然而,似乎主题将重播以前的所有值一个新的订阅(有效地重试()是),把测试进入一个无限循环。

是不是真的- Subject不行为这种方式。 还有是基于这样的事实引起无限循环代码的一些其他方面Retry重新创建订阅和订阅在某些时候产生一个错误。


原来的答复 (完成)

问题是, Retry()你想要它做的事情没有做。 从这里:

http://msdn.microsoft.com/en-us/library/ff708141(v=vs.92).aspx

重复用于RetryCount重次,直到它成功终止的源可观察序列。

这意味着, Retry会不断地尝试重新连接到底层观察到,直到成功,并不会引发错误。

我的理解是你真正想要的观察到的异常被忽略 ,而不是重试。 这将做你想要什么,而不是:

observables
.Select(o => o.Catch((Func<Exception,IObservable<int>>)(e => Observable.Empty<int>())))
.Merge()
.Subscribe(/* subscription code */);

它使用Catch陷阱可观察与异常,并在该点空观测的更换。

下面是使用科目全面测试:

var success = new Subject<int>();
var error = new Subject<int>();

var observables = new List<IObservable<int>> { success.AsObservable(), error.AsObservable() };

observables
.Select(o => o.Catch((Func<Exception,IObservable<int>>)(e => Observable.Empty<int>())))
.Merge()
.Subscribe(Observer.Create<int>(Console.WriteLine, Console.WriteLine, () => Console.WriteLine("done")));

success.OnNext(1);
error.OnError(new Exception("test"));
success.OnNext(2);
success.OnCompleted();

并且这产生,符合市场预期:

1
2
done


文章来源: Non replaying hot observable