Does my “zipLatest” operator already exist?

2020-08-26 04:41发布

问题:

quick question about an operator I've written for myself.

Please excuse my poor-man's marble diagrams:

zip
aa--bb--cc--dd--ee--ff--------gg
--11----22--33--------44--55----
================================
--a1----b2--c3--------d4--e5----

combineLatest
aa--bb--cc--dd--ee--ff--------gg
--11----22--33--------44--55----
================================
--a1b1--c2--d3--e3--f3f4--f5--g5

zipLatest
aa--bb--cc--dd--ee--ff--------gg
--11----22--33--------44--55----
================================
--a1----c2--d3--------f4------g5

zipLatest (the one I wrote) fires at almost the same times as zip, but without the queueing zip includes.

I've already implemented it, I'm just wondering if this already exists. I know I wrote a similar method in the past, to discover by random chance that I'd written the sample operator without knowing it.

So, does this already exist in the framework, or exist as a trivial composition of elements I haven't thought of?

Note: I don't want to rely on equality of my inputs to deduplicate (a la distinctUntilChanged). It should work with a signal that only outputs "a" on an interval.

回答1:

To give an update on the issue: There is still no operator for this requirement included in RxJS 6 and none seems to be planned for future releases. There are also no open pull requests that propose this operator.

As suggested here, a combination of combineLatest, first and repeat will produce the expected behaviour:

combineLatest(obs1, obs2).pipe(first()).pipe(repeat());

combineLatest will wait for the emission of both Observables - throwing away all emissions apart from the latest of each. first will complete the Observable after the emission and repeat resubscribes on combineLatest, causing it to wait again for the latest values of both observables.

The resubscription behaviour of repeat is not fully documented, but can be found in the GitHub source:

source.subscribe(this._unsubscribeAndRecycle());



回答2:

Though you specifically mentions not to use DistinctUntilChanged, you can use it with a counter to distinct new values:

public static IObservable<(T, TSecond)> ZipLatest<T, TSecond>(this IObservable<T> source, IObservable<TSecond> second)
{
    return source.Select((value, id) => (value, id))
        .CombineLatest(second.Select((value, id) => (value, id)), ValueTuple.Create)
        .DistinctUntilChanged(x => (x.Item1.id, x.Item2.id), new AnyEqualityComparer<int, int>())
        .Select(x => (x.Item1.value, x.Item2.value));
}

public class AnyEqualityComparer<T1, T2> : IEqualityComparer<(T1 a, T2 b)>
{
    public bool Equals((T1 a, T2 b) x, (T1 a, T2 b) y) => Equals(x.a, y.a) || Equals(x.b, y.b);
    public int GetHashCode((T1 a, T2 b) obj) => throw new NotSupportedException();
}

Note that I've used Int32 here - because that's what Select() gives me - but it might be to small for some use cases. Int64 or Guid might be a better choice.