I'm learning reactive extensions (rx) in .NET and I'm struggling a little bit with what a "subscription" really is and when it is used.
Lets take some sample data, taken from this thread:
using System;
using System.Reactive.Linq;
using System.Threading;
namespace ConsoleApp1
{
class Program
{
class Result
{
public bool Flag { get; set; }
public string Text { get; set; }
}
static void Main(string[] args)
{
var source =
Observable.Create<Result>(f =>
{
Console.WriteLine("Start creating data!");
f.OnNext(new Result() { Text = "one", Flag = false });
Thread.Sleep(1000);
f.OnNext(new Result() { Text = "two", Flag = true });
Thread.Sleep(1000);
f.OnNext(new Result() { Text = "three", Flag = false });
Thread.Sleep(1000);
f.OnNext(new Result() { Text = "four", Flag = false });
Thread.Sleep(1000);
f.OnNext(new Result() { Text = "five", Flag = true });
Thread.Sleep(1000);
f.OnNext(new Result() { Text = "six", Flag = true });
Thread.Sleep(1000);
f.OnNext(new Result() { Text = "seven", Flag = true });
Thread.Sleep(1000);
f.OnNext(new Result() { Text = "eight", Flag = false });
Thread.Sleep(1000);
f.OnNext(new Result() { Text = "nine", Flag = true });
Thread.Sleep(1000);
f.OnNext(new Result() { Text = "ten", Flag = false });
return () => Console.WriteLine("Observer has unsubscribed");
});
}
}
}
Beware of the line:
Console.WriteLine("Start creating data!");
Now, first I thought a subscription simply is used by using the .Subscribe
operator. So an observer (e.g. the callback of the .Subscribe
function) subscribes to an observable (the last return value of a chain of operators) like this (just as an example, the query doesn't have a real use):
source.Zip(source, (s1, s0) =>
s0.Flag
? Observable.Return(s1)
: Observable.Empty<Result>()).Merge().Subscribe(f => { Console.WriteLine(f.Text); });
Now I was expecting to get the "Start creating data!" output only once, since I was only using one subscription. But in fact, I got it twice:
Start creating data!
Start creating data!
two
five
six
seven
nine
I was told that everytime I use an operator on source.
, a subscription is made. But in this example I'm using source.
only once and then a second time just as a parameter for the .Zip
operator. Or is it because the source is passed to the .Zip
function by value subscribed again?
So my questions are:
- What exactly is a "subscription" in terms of Rx?
- Where / why exactly do the two subscriptions happen on my example?
Btw. I know I can prevent multiple subscriptions from happening by using the .Publish
operator, but that isn't the scope of my questions.