I have an IObservable<T>
where T looks like
public class Notification
{
public int Id { get; set; }
public int Version { get; set; }
}
Notifications are produced at variable time intervals and for different notifications, where the version number get incremented with each updated per notification id.
What would be a proper approach to throttle the observable for a specific period of time and then to receive distinct notifications with the latest Version field?
So far I came up with this for throttling and grouping, but can't figure out how to actually return IObservable<Notification>
.
public static IObservable<int> ThrottledById(this IObservable<Notification> observable)
{
return observable
.GroupByUntil(n => n.Id, x => Observable.Timer(TimeSpan.FromSeconds(1)))
.Select(group => group.Key);
}
Edit: Sample input/output (Throttle delay: 3):
1. { id: 1, v: 1 }
2. { id: 1, v: 2 } { id: 2, v: 1 }
3. { id: 1, v: 3 }
-----------------------------------> notify { id:1, v: 3 }, notify { id:2, v: 1 }
4.
5. { id: 2, v: 2 }
6.
-----------------------------------> notify { id:2, v: 2 }
7. { id: 1, v: 4 }
8. { id: 1, v: 5 } { id: 2, v: 3 }
9. { id: 1, v: 6 }
-----------------------------------> notify { id:1, v: 6 }, notify { id: 2, v: 3 }
...
...
This seems to produce your desired output
The idea being that the aggregate makes it so only the last value of each group makes it out alive and the grouped streams complete once the timer hits 3 seconds.
I cannibalized this
https://social.msdn.microsoft.com/Forums/en-US/7ebf68e8-99af-44e2-b80d-0292cb5546bc/group-by-with-buffer-reactive-extensions?forum=rx
For the idea
That approach fully meet your expectations:
If you want to gather all notifications with the same id for n seconds after coming first of them and expose the last one, then approach, based on GroupByUntil, is what you need.
Your sample input/output would look so: