Rx: Ignoring updates caused by Subscribers

2019-05-23 11:23发布

问题:

I'm having problems figuring out how to do this. I have two instances (source & target) that implement INotifyPropertyChanged and I'm tracking the PropertyChanged event for both. What I want to do is run an action any time source.PropertyChanged is raised until target.PropertyChanged is raised. I can do that just fine like this:

INotifyPropertyChanged source;
INotifyPropertyChanged target;

var sourcePropertyChanged = Observable
    .FromEvent<PropertyChangedEventArgs>(source, "PropertyChanged")
    .Where(x => x.EventArgs.PropertyName == sourcePropertyName);

var targetPropertyChanged = Observable
    .FromEvent<PropertyChangedEventArgs>(target, "PropertyChanged")
    .Where(x => x.EventArgs.PropertyName == targetPropertyName);

sourcePropertyChanged
    .TakeUntil(targetPropertyChanged)
    .ObserveOnDispatcher()
    .Subscribe(_ => /*Raises target.PropertyChanged for targetPropertyName*/);

The problem I'm having is I want to ignore the PropertyChanged notifications caused by the actions and only stop taking values when the PropertyChanged event is raised by an external source. Is there a good way to get that to happen?

回答1:

There's no built in way of doing what you're talking about. Here's a simple SkipWhen implementation that skips the next source value each time a value is received on the 'other' sequence:

public static IObservable<TSource> SkipWhen(this IObservable<TSource> source, 
    IObservable<TOther> other)
{
    return Observable.Defer<TSource>(() =>
    {
        object lockObject = new object();
        Stack<TOther> skipStack = new Stack<TOther>();

        other.Subscribe(x => { lock(lockObject) { skipStack.Push(x); });

        return source.Where(_ =>
        {
            lock(lockObject);
            {
                if (skipStack.Count > 0)
                {
                    skipStack.Pop();
                    return false;
                }
                else
                {
                    return true;
                }
            }
        });
    });
}

You're code would then be updated like so (see my note below):

INotifyPropertyChanged source;
INotifyPropertyChanged target;

// See the link at the bottom of my answer
var sourcePropertyChanged = source.GetPropertyChangeValues(x => x.SourceProperty);

// Unit is Rx's "void"
var targetChangedLocally = new Subject<Unit>();

var targetPropertyChanged = target.GetPropertyChangeValues(x => x.TargetProperty)
    .SkipWhen(targetChangedLocally);

sourcePropertyChanged
    .TakeUntil(targetPropertyChanged)
    .ObserveOnDispatcher()
    .Subscribe(_ =>
    {
        targetChangedLocally.OnNext();
        /*Raises target.PropertyChanged for targetPropertyName*/
    });

NB: I recently blogged about a strongly typed IObservable wrapper around INotifyPropertyChanged events; feel free to steal that code.



回答2:

There's no built-in way but you could probably filter out events using the Where extension method for observable. The condition to filter on would be the sender of the event. I suppose that the sender of a target.PropertyChanged event is different than the sender of a PropertyChanged event raised by another source.

I'm not entirely sure if this is an approach you can use.



回答3:

Using locks in Rx this way is fine. The lock is short lived and doesn't call out to user code.