Building on this question which discusses flushing a time-based buffer: Force flush to Observable.Buffer c#, I'm having difficulty working out how to translate this answer given there to the case where I'm buffering by count, not by time:
var subject = new Subject<Unit>();
var closing = Observable
.Timer(new TimeSpan(0, 0, 1, 30))
.Select(x => Unit.Default);
var query =
mFluxObservable
.Buffer(() => Observable
.Amb(subject, closing)
.Take(1));
I started using the same Amb
logic, using an 'item counter' instead of Timer, but found myself going down the rabbit hole of trying to work out how to reset that.
Can you push me gently in the direction of working out how to implement my missing function?
var flusher = new Subject<Unit>();
var source = Observable.Interval(TimeSpan.FromSeconds(0.1));
var output = source.BufferExceptOnFlush(100, flusher);
My source is 'hot', if that helps...
PS: I could figure something out using Observable.Create
and some kind of internal counter, but not without locking...
This is what I've got so far:
I think you can do it by using the source in the closing observable and merging that with a flushing observable. The following worked for me:
I see nothing wrong with an
Observable.Create<T>
solution. In which case I think this extension should workI have not tested it yet, but would enable usage like this
It will work for both hot and cold observables as there is no double subscription