quick question about an operator I've written for myself.
Please excuse my poor-man's marble diagrams:
zip
aa--bb--cc--dd--ee--ff--------gg
--11----22--33--------44--55----
================================
--a1----b2--c3--------d4--e5----
combineLatest
aa--bb--cc--dd--ee--ff--------gg
--11----22--33--------44--55----
================================
--a1b1--c2--d3--e3--f3f4--f5--g5
zipLatest
aa--bb--cc--dd--ee--ff--------gg
--11----22--33--------44--55----
================================
--a1----c2--d3--------f4------g5
zipLatest
(the one I wrote) fires at almost the same times as zip
, but without the queueing zip
includes.
I've already implemented it, I'm just wondering if this already exists.
I know I wrote a similar method in the past, to discover by random chance that I'd written the sample
operator without knowing it.
So, does this already exist in the framework, or exist as a trivial composition of elements I haven't thought of?
Note: I don't want to rely on equality of my inputs to deduplicate (a la distinctUntilChanged
).
It should work with a signal that only outputs "a"
on an interval.
To give an update on the issue: There is still no operator for this requirement included in RxJS 6 and none seems to be planned for future releases. There are also no open pull requests that propose this operator.
As suggested here, a combination of combineLatest
, first
and repeat
will produce the expected behaviour:
combineLatest(obs1, obs2).pipe(first()).pipe(repeat());
combineLatest
will wait for the emission of both Observables - throwing away all emissions apart from the latest of each. first
will complete the Observable after the emission and repeat
resubscribes on combineLatest
, causing it to wait again for the latest values of both observables.
The resubscription behaviour of repeat
is not fully documented, but can be found in the GitHub source:
source.subscribe(this._unsubscribeAndRecycle());
Though you specifically mentions not to use DistinctUntilChanged, you can use it with a counter to distinct new values:
public static IObservable<(T, TSecond)> ZipLatest<T, TSecond>(this IObservable<T> source, IObservable<TSecond> second)
{
return source.Select((value, id) => (value, id))
.CombineLatest(second.Select((value, id) => (value, id)), ValueTuple.Create)
.DistinctUntilChanged(x => (x.Item1.id, x.Item2.id), new AnyEqualityComparer<int, int>())
.Select(x => (x.Item1.value, x.Item2.value));
}
public class AnyEqualityComparer<T1, T2> : IEqualityComparer<(T1 a, T2 b)>
{
public bool Equals((T1 a, T2 b) x, (T1 a, T2 b) y) => Equals(x.a, y.a) || Equals(x.b, y.b);
public int GetHashCode((T1 a, T2 b) obj) => throw new NotSupportedException();
}
Note that I've used Int32 here - because that's what Select() gives me - but it might be to small for some use cases. Int64 or Guid might be a better choice.