Merging multiple custom observables in RX

2019-09-14 19:17发布

问题:

Trying to model a system sending out notifications from a number of publishers using RX.

I have two custom interfaces ITopicObservable and ITopicObserver to model the fact that the implementing classes will have other properties and methods apart from the IObservable and IObserver interfaces.

The problem I have is that my thinking is I should be able to add a number of observables together, merge them together and subscribe to an observer to provide updates from all merged observables. However the code with "the issue" comment throws an invalid cast exception.

The use case is a number of independent sensors each monitoring a temperature in a box for example that aggregate all their reports to one temperature report which is then subscribed to by a temperature health monitor.

What am I missing here? Or is there a better way to implement the scenario using RX?

Code below

using System;
using System.Reactive.Linq;
using System.Collections.Generic;

namespace test
{
class MainClass
{
    public static void Main (string[] args)
    {
        Console.WriteLine ("Hello World!");
        var to = new TopicObserver ();
        var s = new TopicObservable ("test");

        var agg = new AggregatedTopicObservable ();
        agg.Add (s);

        agg.Subscribe (to);
    }
}

public interface ITopicObservable<TType>:IObservable<TType>
{
    string Name{get;}
}

public class TopicObservable:ITopicObservable<int>
{
    public TopicObservable(string name)
    {
        Name = name;
    }
    #region IObservable implementation
    public IDisposable Subscribe (IObserver<int> observer)
    {
        return null;
    }
    #endregion
    #region ITopicObservable implementation
    public string Name { get;private set;}

    #endregion
}

public class AggregatedTopicObservable:ITopicObservable<int>
{
    List<TopicObservable> _topics;
    private ITopicObservable<int> _observable;
    private IDisposable _disposable;

    public AggregatedTopicObservable()
    {
        _topics = new List<TopicObservable>();
    }

    public void Add(ITopicObservable<int> observable)
    {
        _topics.Add ((TopicObservable)observable);
    }

    #region IObservable implementation
    public IDisposable Subscribe (IObserver<int> observer)
    {
        _observable = (ITopicObservable<int>)_topics.Merge ();

        _disposable = _observable.Subscribe(observer);

        return _disposable;
    }
    #endregion
    #region ITopicObservable implementation
    public string Name { get;private set;}
    #endregion

}



public interface ITopicObserver<TType>:IObserver<TType>
{
    string Name{get;}
}

public class TopicObserver:ITopicObserver<int>
{
    #region IObserver implementation
    public void OnNext (int value)
    {
        Console.WriteLine ("next {0}", value);
    }
    public void OnError (Exception error)
    {
        Console.WriteLine ("error {0}", error.Message);
    }
    public void OnCompleted ()
    {
        Console.WriteLine ("finished");
    }
    #endregion
    #region ITopicObserver implementation
    public string Name { get;private set;}
    #endregion

}
}

回答1:

The signature of the .Merge(...) operator that you're using is:

IObservable<TSource> Merge<TSource>(this IEnumerable<IObservable<TSource>> sources)

The actual type returned by this .Merge() is:

System.Reactive.Linq.ObservableImpl.Merge`1[System.Int32]

...so it should be fairly clear that calling (ITopicObservable<int>)_topics.Merge(); would fail.

Lee's advice not to implement either of IObservable<> or IObserver<> is the correct one. It leads to errors like the one above.

If you had to do something like this, I would do it this way:

public interface ITopic
{
    string Name { get; }
}

public interface ITopicObservable<TType> : ITopic, IObservable<TType>
{ }

public interface ITopicSubject<TType> : ISubject<TType>, ITopicObservable<TType>
{ }

public interface ITopicObserver<TType> : ITopic, IObserver<TType>
{ }

public class Topic
{
    public string Name { get; private set; }

    public Topic(string name)
    {
        this.Name = name;
    }
}

public class TopicSubject : Topic, ITopicSubject<int>
{
    private Subject<int> _subject = new Subject<int>();

    public TopicSubject(string name)
        : base(name)
    { }

    public IDisposable Subscribe(IObserver<int> observer)
    {
        return _subject.Subscribe(observer);
    }

    public void OnNext(int value)
    {
        _subject.OnNext(value);
    }

    public void OnError(Exception error)
    {
        _subject.OnError(error);
    }

    public void OnCompleted()
    {
        _subject.OnCompleted();
    }
}

public class AggregatedTopicObservable : Topic, ITopicObservable<int>
{
    List<ITopicObservable<int>> _topics = new List<ITopicObservable<int>>();

    public AggregatedTopicObservable(string name)
        : base(name)
    { }

    public void Add(ITopicObservable<int> observable)
    {
        _topics.Add(observable);
    }

    public IDisposable Subscribe(IObserver<int> observer)
    {
        return _topics.Merge().Subscribe(observer);
    }
}

public class TopicObserver : Topic, ITopicObserver<int>
{
    private IObserver<int> _observer;

    public TopicObserver(string name)
        : base(name)
    {
        _observer =
            Observer
                .Create<int>(
                    value => Console.WriteLine("next {0}", value),
                    error => Console.WriteLine("error {0}", error.Message),
                    () => Console.WriteLine("finished"));
    }

    public void OnNext(int value)
    {
        _observer.OnNext(value);
    }
    public void OnError(Exception error)
    {
        _observer.OnError(error);
    }
    public void OnCompleted()
    {
        _observer.OnCompleted();
    }
}

And run it with:

var to = new TopicObserver("watching");
var ts1 = new TopicSubject("topic 1");
var ts2 = new TopicSubject("topic 2");

var agg = new AggregatedTopicObservable("agg");

agg.Add(ts1);
agg.Add(ts2);

agg.Subscribe(to);

ts1.OnNext(42);
ts1.OnCompleted();

ts2.OnNext(1);
ts2.OnCompleted();

Which gives:

next 42
next 1
finished

But apart from being able to give everything a name (which I'm not sure how it helps) you could always do this:

var to =
    Observer
        .Create<int>(
            value => Console.WriteLine("next {0}", value),
            error => Console.WriteLine("error {0}", error.Message),
            () => Console.WriteLine("finished"));

var ts1 = new Subject<int>();
var ts2 = new Subject<int>();

var agg = new [] { ts1, ts2 }.Merge();

agg.Subscribe(to);

ts1.OnNext(42);
ts1.OnCompleted();

ts2.OnNext(1);
ts2.OnCompleted();

Same output with no interfaces and classes.

There's even a more interesting way. Try this:

var to =
    Observer
        .Create<int>(
            value => Console.WriteLine("next {0}", value),
            error => Console.WriteLine("error {0}", error.Message),
            () => Console.WriteLine("finished"));

var agg = new Subject<IObservable<int>>();

agg.Merge().Subscribe(to);

var ts1 = new Subject<int>();
var ts2 = new Subject<int>();

agg.OnNext(ts1);
agg.OnNext(ts2);

ts1.OnNext(42);
ts1.OnCompleted();

ts2.OnNext(1);
ts2.OnCompleted();

var ts3 = new Subject<int>();

agg.OnNext(ts3);

ts3.OnNext(99);
ts3.OnCompleted();

This produces:

next 42
next 1
next 99

It allows you to add new source observables after the merge!



回答2:

My first thought, is that you shouldn't implement IObservable<T>, you should compose it by exposing it as a property or the result of a method.

Second thought is that there are operators in Rx that excel at merging/aggregating multiple sequences together. You should favor using those.

Third, which is similar to the first, you generally don't implement IObserver<T>, you just subscribe to the observable sequence and provide delegates for each call back (OnNext, OnError and OnComplete)

So your code basically is reduced to

Console.WriteLine("Hello World!");
var topic1 = TopicListener("test1");
var topic2 = TopicListener("test2");

topic1.Merge(topic2)
    .Subscribe(
    val => { Console.WriteLine("One of the topics published this value {0}", val);},
    ex => { Console.WriteLine("One of the topics errored. Now the whole sequence is dead {0}", ex);},
    () => {Console.WriteLine("All topics have completed.");});

Where TopicListener(string) is just a method that returns IObservable<T>. The implementation of the TopicListener(string) method would most probably use Observable.Create.

It may help to see examples of mapping Rx over a Topic based messaging system. There is an example of how you can layer Rx over TibRv topics here https://github.com/LeeCampbell/RxCookbook/blob/master/IO/Comms/TibRvSample.linq