How should one go about implementing a DistinctLat

2019-09-15 01:25发布

问题:

I had a question A cache serving updates and new values as “DistinctLatest” and full cache contents upon subscription, which was well handled by the community. A question was raised that the actual goal of caching and replacing values like defined in the aforementioned question could be defined with a .DistinctLatest operator.

OK! There doesn't seem to be much talk about such an operator. While searching, and thinking about it, I found ReactiveX: Group and Buffer only last item in each group, which is kind of close. To mimick the original issue, I tried to write the caching operator as

/// <summary>
/// A cache that keeps distinct elements where the elements are replaced by the latest.
/// </summary>
/// <typeparam name="T">The type of the result</typeparam>
/// <typeparam name="TKey">The type of the selector key for distinct results.</typeparam>
/// <param name="newElements">The sequence of new elements.</param>
/// <param name="seedElements">The seed elements when the cache is started.</param>
/// <param name="replacementSelector">The replacement selector to choose distinct elements in the cache.</param>
/// <returns>The cache contents upon first call and changes thereafter.</returns>
public static IObservable<T> Cache<T, TKey>(this IObservable<T> newElements, IEnumerable<T> seedElements, Func<T, TKey> replacementSelector)
{
    var s = newElements.StartWith(seedElements).GroupBy(replacementSelector).Select(groupObservable =>
    {
        var replaySubject = new ReplaySubject<T>(1);
        groupObservable.Subscribe(value => replaySubject.OnNext(value));

        return replaySubject;
    });

    return s.SelectMany(i => i);            
 }

But doing testing that doesn't seem to do the trick either. It looks like if one subscribe in the beginning the initial values and the updates (and new values) are observed. While if one subscribed at the end, only the replaced seed values are recorded.

Now, I wonder about a general DistinctLast operator, which I think this, but it doesn't work, and then what this "cache" adds is seed values and flattening of the groups, but that's not what the test tells. I also tried some things with grouping and .TakeLast() too, but no dice.

I'd be delighted if anyone had pointers or pondering about this and hopefully this turns out to be something commonly beneficial.

回答1:

@LeeCampbell did most of the work for this. See other referenced question. Anyway, here's the code:

public static class RxExtensions
{
    public static IObservable<T> DistinctLatest<T, TKey>(this IObservable<T> newElements, IEnumerable<T> seedElements, Func<T, TKey> replacementSelector)
    {
        return seedElements.ToObservable()
            .Concat(newElements)
            .GroupBy(i => replacementSelector)
            .SelectMany(grp => grp.Replay(1).Publish().RefCoun‌​t());
    }
}