Rx grouped throttling

2019-07-13 01:17发布

I have an IObservable<T> where T looks like

public class Notification
{
    public int Id { get; set; }
    public int Version { get; set; }
}

Notifications are produced at variable time intervals and for different notifications, where the version number get incremented with each updated per notification id.

What would be a proper approach to throttle the observable for a specific period of time and then to receive distinct notifications with the latest Version field?

So far I came up with this for throttling and grouping, but can't figure out how to actually return IObservable<Notification>.

public static IObservable<int> ThrottledById(this IObservable<Notification> observable)
{
    return observable
        .GroupByUntil(n => n.Id, x => Observable.Timer(TimeSpan.FromSeconds(1)))
        .Select(group => group.Key);
}

Edit: Sample input/output (Throttle delay: 3):

1. { id: 1, v: 1 }
2. { id: 1, v: 2 }  { id: 2, v: 1 }
3. { id: 1, v: 3 }
-----------------------------------> notify { id:1, v: 3 }, notify { id:2, v: 1 }
4. 
5. { id: 2, v: 2 }
6.
-----------------------------------> notify { id:2, v: 2 }
7. { id: 1, v: 4 }
8. { id: 1, v: 5 }  { id: 2, v: 3 }
9. { id: 1, v: 6 }
-----------------------------------> notify { id:1, v: 6 }, notify { id: 2, v: 3 }
...
...

2条回答
SAY GOODBYE
2楼-- · 2019-07-13 01:31

This seems to produce your desired output

 IObservable<Notification> GroupByIdThrottle(IObservable<Notification> producer, IScheduler scheduler)
    {
        return Observable.Create<Notification>(observer =>
        {
            return producer
                .GroupByUntil(
                    item => item.Id,
                    item => Observable.Timer(TimeSpan.FromSeconds(3), scheduler))
                .SelectMany(result =>
                {
                    return result.Aggregate<Notification, Notification>(null, (dict, item) =>
                    {
                        return item;
                    });
                })
                .Subscribe(observer);

        });
    }

The idea being that the aggregate makes it so only the last value of each group makes it out alive and the grouped streams complete once the timer hits 3 seconds.

I cannibalized this

https://social.msdn.microsoft.com/Forums/en-US/7ebf68e8-99af-44e2-b80d-0292cb5546bc/group-by-with-buffer-reactive-extensions?forum=rx

For the idea

查看更多
Summer. ? 凉城
3楼-- · 2019-07-13 01:42

That approach fully meet your expectations:

public static IObservable<Notification> ThrottledById(this IObservable<Notification> observable)
{
    return observable.Buffer(TimeSpan.FromSeconds(3))
        .SelectMany(x =>
            x.GroupBy(y => y.Id)
            .Select(y => y.Last()));
}

If you want to gather all notifications with the same id for n seconds after coming first of them and expose the last one, then approach, based on GroupByUntil, is what you need.

public static IObservable<Notification> ThrottledById(this IObservable<Notification> observable)
{
    return observable.GroupByUntil(x => x.Id, x => Observable.Timer(TimeSpan.FromSeconds(3)))
        .SelectMany(x => x.LastAsync());
}

Your sample input/output would look so:

1. { id: 1, v: 1 }
2. { id: 1, v: 2 }  { id: 2, v: 1 }
3. { id: 1, v: 3 }
-----------------------------------> notify { id:1, v: 3 }
4. 
-----------------------------------> notify { id:2, v: 1 }
5. { id: 2, v: 2 }
6.
7. { id: 1, v: 4 }
-----------------------------------> notify { id:2, v: 2 }
8. { id: 1, v: 5 }  { id: 2, v: 3 }
9. { id: 1, v: 6 }
-----------------------------------> notify { id:1, v: 6 }
10.
-----------------------------------> notify { id: 2, v: 3 }
...
...
查看更多
登录 后发表回答