I have an IObservable<T>
where T looks like
public class Notification
{
public int Id { get; set; }
public int Version { get; set; }
}
Notifications are produced at variable time intervals and for different notifications, where the version number get incremented with each updated per notification id.
What would be a proper approach to throttle the observable for a specific period of time and then to receive distinct notifications with the latest Version field?
So far I came up with this for throttling and grouping, but can't figure out how to actually return IObservable<Notification>
.
public static IObservable<int> ThrottledById(this IObservable<Notification> observable)
{
return observable
.GroupByUntil(n => n.Id, x => Observable.Timer(TimeSpan.FromSeconds(1)))
.Select(group => group.Key);
}
Edit:
Sample input/output (Throttle delay: 3):
1. { id: 1, v: 1 }
2. { id: 1, v: 2 } { id: 2, v: 1 }
3. { id: 1, v: 3 }
-----------------------------------> notify { id:1, v: 3 }, notify { id:2, v: 1 }
4.
5. { id: 2, v: 2 }
6.
-----------------------------------> notify { id:2, v: 2 }
7. { id: 1, v: 4 }
8. { id: 1, v: 5 } { id: 2, v: 3 }
9. { id: 1, v: 6 }
-----------------------------------> notify { id:1, v: 6 }, notify { id: 2, v: 3 }
...
...
This seems to produce your desired output
IObservable<Notification> GroupByIdThrottle(IObservable<Notification> producer, IScheduler scheduler)
{
return Observable.Create<Notification>(observer =>
{
return producer
.GroupByUntil(
item => item.Id,
item => Observable.Timer(TimeSpan.FromSeconds(3), scheduler))
.SelectMany(result =>
{
return result.Aggregate<Notification, Notification>(null, (dict, item) =>
{
return item;
});
})
.Subscribe(observer);
});
}
The idea being that the aggregate makes it so only the last value of each group makes it out alive and the grouped streams complete once the timer hits 3 seconds.
I cannibalized this
https://social.msdn.microsoft.com/Forums/en-US/7ebf68e8-99af-44e2-b80d-0292cb5546bc/group-by-with-buffer-reactive-extensions?forum=rx
For the idea
That approach fully meet your expectations:
public static IObservable<Notification> ThrottledById(this IObservable<Notification> observable)
{
return observable.Buffer(TimeSpan.FromSeconds(3))
.SelectMany(x =>
x.GroupBy(y => y.Id)
.Select(y => y.Last()));
}
If you want to gather all notifications with the same id for n seconds after coming first of them and expose the last one, then approach, based on GroupByUntil, is what you need.
public static IObservable<Notification> ThrottledById(this IObservable<Notification> observable)
{
return observable.GroupByUntil(x => x.Id, x => Observable.Timer(TimeSpan.FromSeconds(3)))
.SelectMany(x => x.LastAsync());
}
Your sample input/output would look so:
1. { id: 1, v: 1 }
2. { id: 1, v: 2 } { id: 2, v: 1 }
3. { id: 1, v: 3 }
-----------------------------------> notify { id:1, v: 3 }
4.
-----------------------------------> notify { id:2, v: 1 }
5. { id: 2, v: 2 }
6.
7. { id: 1, v: 4 }
-----------------------------------> notify { id:2, v: 2 }
8. { id: 1, v: 5 } { id: 2, v: 3 }
9. { id: 1, v: 6 }
-----------------------------------> notify { id:1, v: 6 }
10.
-----------------------------------> notify { id: 2, v: 3 }
...
...