Rx: What are subscriptions and how do subscription

2019-08-15 09:17发布

问题:

I'm learning reactive extensions (rx) in .NET and I'm struggling a little bit with what a "subscription" really is and when it is used.

Lets take some sample data, taken from this thread:

using System;
using System.Reactive.Linq;
using System.Threading;

namespace ConsoleApp1
{
    class Program
    {
        class Result
        {
            public bool Flag { get; set; }
            public string Text { get; set; }
        }

        static void Main(string[] args)
        {               
            var source =
               Observable.Create<Result>(f =>
               {
                   Console.WriteLine("Start creating data!");

                   f.OnNext(new Result() { Text = "one", Flag = false });
                   Thread.Sleep(1000);
                   f.OnNext(new Result() { Text = "two", Flag = true });
                   Thread.Sleep(1000);
                   f.OnNext(new Result() { Text = "three", Flag = false });
                   Thread.Sleep(1000);
                   f.OnNext(new Result() { Text = "four", Flag = false });
                   Thread.Sleep(1000);
                   f.OnNext(new Result() { Text = "five", Flag = true });
                   Thread.Sleep(1000);
                   f.OnNext(new Result() { Text = "six", Flag = true });
                   Thread.Sleep(1000);
                   f.OnNext(new Result() { Text = "seven", Flag = true });
                   Thread.Sleep(1000);
                   f.OnNext(new Result() { Text = "eight", Flag = false });
                   Thread.Sleep(1000);
                   f.OnNext(new Result() { Text = "nine", Flag = true });
                   Thread.Sleep(1000);
                   f.OnNext(new Result() { Text = "ten", Flag = false });

                   return () => Console.WriteLine("Observer has unsubscribed");
               });
        }
    }
}

Beware of the line:

 Console.WriteLine("Start creating data!");

Now, first I thought a subscription simply is used by using the .Subscribe operator. So an observer (e.g. the callback of the .Subscribe function) subscribes to an observable (the last return value of a chain of operators) like this (just as an example, the query doesn't have a real use):

  source.Zip(source, (s1, s0) =>
     s0.Flag
     ? Observable.Return(s1)
     : Observable.Empty<Result>()).Merge().Subscribe(f => { Console.WriteLine(f.Text); });

Now I was expecting to get the "Start creating data!" output only once, since I was only using one subscription. But in fact, I got it twice:

Start creating data!
Start creating data!
two
five
six
seven
nine

I was told that everytime I use an operator on source., a subscription is made. But in this example I'm using source. only once and then a second time just as a parameter for the .Zip operator. Or is it because the source is passed to the .Zip function by value subscribed again?

So my questions are:

  1. What exactly is a "subscription" in terms of Rx?
  2. Where / why exactly do the two subscriptions happen on my example?

Btw. I know I can prevent multiple subscriptions from happening by using the .Publish operator, but that isn't the scope of my questions.

回答1:

In simple terms a Subscription just represents an Observable that has been subscribed to. This process can happen either explicitly by using .Subscribe or implicitly by joining two or more Observables and then subscribing to the resulting chain.

In your case you are seeing both happen, once explicitly when you call Subscribe and one implicitly when you pass source to Zip, that is, there are two Subscriptions to the source Observable.

Why is that important? Because by default Observables are lazy, meaning that they will not begin processing until you subscribe to them (the product of that process being a Subscription), by extension this means that any time you subscribe to the Observable it will effectively begin a new stream. This behavior can be overridden like you alluded to with Publish, but the default is for each Observable to be cold.

In your specific case, since you are passing the same Observable to Zip it needs to subscribe to it twice, since it will be zipping together events from the two passed streams. The result is two subscriptions to the same Observable which each run independently of each other.