How do I merge several observables using WhenAny(…

2019-04-12 14:03发布

问题:

I have a question which is an extension of the following question raised on this site.

Is there a more elegant way to merge observables when return type is unimportant?

I have an IObservable<Unit> (lets say X), a reactive collection (Y) and a property (Z). Return type is not important. I just want to subscribe when any of these change.

I know how to observe all 3 and Subscribe using Observable.Merge as below.

Observable.Merge(X, Y.Changed, ObservableForProperty(Z).Select(_ => Unit.Default)).Subscribe(..)

And it works.

However, when I try to use WhenAny(...,....,....).Subscribe(), the subscribe does not get triggered when my X changes. What is the syntax for doing the above using WhenAny(...) rather than Observable.Merge(..)??

I prefer to use WhenAny(....) because I am using ReactiveUI in other places.

Example: Say I've got a class derived from ReactiveObject with following properties.

public class AnotherVM : ReactiveObject
{
    public bool IsTrue
    {
        get { return this.isTrue; }
        set { this.RaiseAndSetIfChanged(x => x.isTrue, ref this.isTrue, value); }
    }

    public IObservable<Unit> Data
    {
        get { return this.data; }
    }

    public ReactiveCollection MyCol
    {
       get { return Mycol; }
    }    
}

public class MyVM : ReactiveObject
{
    MyVM
    {
       // do WhenAny or Observable.Merge here....
    }
}

I want to observe the above properties in AnotherVM class using Observable.Merge(..) or WhenAny(...) in MyVM class. I found that I do not always get a notification when I subscribe to the above in MyVM using WhenAny(...) or Merge(...) when either of the 3 properties change.

回答1:

WhenAny is not for monitoring across sets of arbitrary observables, it's for monitoring the properties of an object supported by ReactiveUI (like a ReactiveObject or reactive collection).

For the general case of combining changes in observable streams, Observable.Merge is the right way to go.

EDIT

I note that you have declared the Data and MyCol properties read only. If you use a Merge like this:

Observerable.Merge(this.WhenAnyValue(o=>o.IsTrue, v=>Unit.Default),
                   this.Data,
                   this.MyCol.CollectionChanged.Select(v=>Unit.Default))

...then you must be careful not to change the backing fields. If you do, then you will get missing events - maybe this is what is happening?

In that case you would need to wire up those properties to RaiseAndSetIfChanged and use a Switch to keep track - e.g. if this.data could change then you would need (I'm using ReactiveUI 5 + .NET 4.5 here in case the RaiseAndSetIfChanged syntax looks odd):

public IObservable<Unit> Data
{
    get { return this.data; }
    private set { this.RaiseAndSetIfChanged(ref data, value); }
}

and your merge would be something like:

Observerable.Merge(this.WhenAnyValue(o=>o.IsTrue, v=>Unit.Default),
                   this.WhenAnyObservable(x => x.Data),
                   this.MyCol.CollectionChanged.Select(v=>Unit.Default))

WhenAnyObservable is conceptually equivalent to this:

WhenAny(x => x.Data, vm => vm.Value).Switch()

using Switch to flip over to the latest value of Data when it changes. Don't forget to use the setter to change values of data!



回答2:

This should do it.

IObservable<Unit> merged =
    Observerable.Merge
    ( this.WhenAnyValue(o=>o.IsTrue, v=>Unit.Default)
    , this.Data
    , this.MyCol.CollectionChanged.Select(v=>Unit.Default)
    )

Theoretically you could write a special version of merge that would disregard the type of the observable and return IObservable<Unit>. Then you could write

IObservable<Unit> merged =
    Observerable.MergeToUnit
    ( this.WhenAnyValue(o=>o.IsTrue)
    , this.Data
    , this.MyCol.CollectionChanged
    )

but then you would need many overloads of MergeToUnit for up to the N parameters you would like to support.

The most general pattern to use with WhenAny with multiple objects is

Observable.CombineLatest
  ( source0.WhenAnyValue(s=>s.FieldA)
  , source1.WhenAnyValue(s=>s.FieldB)
  , source2.WhenAnyValue(s=>s.FieldC)
  , source3.WhenAnyValue(s=>s.FieldD)
  , (a,b,c,d) => Process(a,b,c,d)
  )

It's sometimes just better to get used to using the standard combinators.