I'm using reactive extensions to collate data into buffers of 100ms:
this.subscription = this.dataService
.Where(x => !string.Equals("FOO", x.Key.Source))
.Buffer(TimeSpan.FromMilliseconds(100))
.ObserveOn(this.dispatcherService)
.Where(x => x.Count != 0)
.Subscribe(this.OnBufferReceived);
This works fine. However, I want slightly different behavior than that provided by the Buffer
operation. Essentially, I want to reset the timer if another data item is received. Only when no data has been received for the entire 100ms do I want to handle it. This opens up the possibility of never handling the data, so I should also be able to specify a maximum count. I would imagine something along the lines of:
.SlidingBuffer(TimeSpan.FromMilliseconds(100), 10000)
I've had a look around and haven't been able to find anything like this in Rx? Can anyone confirm/deny this?
With Rx Extensions 2.0, your can answer both requirements with a new Buffer overload accepting a timeout and a size:
See https://msdn.microsoft.com/en-us/library/hh229200(v=vs.103).aspx for the documentation.
This is possible by combining the built-in
Window
andThrottle
methods ofObservable
. First, let's solve the simpler problem where we ignore the maximum count condition:The powerful
Window
method did the heavy lifting. Now it's easy enough to see how to add a maximum count:I'll write a post explaining this on my blog. https://gist.github.com/2244036
Documentation for the Window method:
I wrote an extension to do most of what you're after -
BufferWithInactivity
.Here it is:
I guess this can be implemented on top of Buffer method as shown below:
NOTE: I haven't tested it, but I hope it gives you the idea.