waiting IObservable to get all elements error

2019-05-12 00:19发布

问题:

I have this class:

public class TestService
{
     public IObservable<int> GetObservable(int max)
     {
         var subject = new Subject<int>();
         Task.Factory.StartNew(() =>
                               {
                                   for (int i = 0; i < max; i++)
                                   {
                                       subject.OnNext(i);
                                   }
                                   subject.OnCompleted();

                               });
         return subject;
     }
}

I wrote a test method for this as well:

[TestMethod]
public void TestServiceTest1()
{
   var testService = new TestService();
   var i = 0;
   var observable = testService.GetObservable(3);
   observable.Subscribe(_ =>
   {
      i++;
   });          
   observable.Wait();
   Assert.AreEqual(i, 3);
}

But sometimes I get the error: Sequence contains no elements in method Wait().

I propose that my IObservable is completed before test reaches the observable.Wait() line. How can I avoid this error?

回答1:

It seems to me that the basic problem in this code is that an IObservable represents a contract of how to observe something.

In this case, the GetObservable method is not just returning a contract, it is performing work (with TPL) right away. This does not make sense if you consider that you can subscribe multiple times to the same IObservable instance (which is in fact happening in the sample code, as you're subscribing a first time with Subscribe and another with Wait). This single distinction was the biggest stumbling block for me in learning Rx.

I would instead implement the method like this (and avoid altogether using Subject<>, since it is not the preferred way of creating an Observable):

public class TestService
{
     public IObservable<int> GetObservable(int max)
     {
         return Observable.Create<int>((IObserver<int> observer) =>
                               {
                                   for (int i = 0; i < max; i++)
                                   {
                                       observer.OnNext(i);
                                   }
                                   observer.OnCompleted();
                               });
     }
}

There are also interesting overrides to Observable.Create which provide more flexibility.