Building on this question which discusses flushing a time-based buffer:
Force flush to Observable.Buffer c#, I'm having difficulty working out how to translate this answer given there to the case where I'm buffering by count, not by time:
var subject = new Subject<Unit>();
var closing = Observable
.Timer(new TimeSpan(0, 0, 1, 30))
.Select(x => Unit.Default);
var query =
mFluxObservable
.Buffer(() => Observable
.Amb(subject, closing)
.Take(1));
I started using the same Amb
logic, using an 'item counter' instead of Timer, but found myself going down the rabbit hole of trying to work out how to reset that.
Can you push me gently in the direction of working out how to implement my missing function?
var flusher = new Subject<Unit>();
var source = Observable.Interval(TimeSpan.FromSeconds(0.1));
var output = source.BufferExceptOnFlush(100, flusher);
My source is 'hot', if that helps...
PS: I could figure something out using Observable.Create
and some kind of internal counter, but not without locking...
I think you can do it by using the source in the closing observable and merging that with a flushing observable. The following worked for me:
var source = new Subject<Unit>();
var flush = new Subject<Unit>();
// close buffer every 3 values or when a flush value arrives
var closing = source.Buffer(3)
.Select(x => Unit.Default)
.Merge(flush);
var query = source.Buffer(() => closing)
.Subscribe(Console.WriteLine);
// some test values
source.OnNext(Unit.Default);
source.OnNext(Unit.Default);
source.OnNext(Unit.Default);
source.OnNext(Unit.Default);
// flush buffer
flush.OnNext(Unit.Default);
This is what I've got so far:
var flush = new Subject<Unit>();
var source = Observable.Interval(TimeSpan.FromSeconds(1))
.Select(_ => Unit.Default).Publish().RefCount();
var closer = CloseGenerator(source, flush, 5);
source.Buffer(closer)
//...
private IObservable<Unit> CloseGenerator<T>(IObservable<T> source,
IObservable<Unit> flusher, int count)
{
return Observable.CombineLatest(
source.Select((_, i) => i),
flusher.Select((_, i) => i).StartWith(-1))
.Select(ar => Tuple.Create(ar[0], ar[1]))
.Scan(Tuple.Create(-1, -1), (prev, next) =>
{
if(next.Item2 != prev.Item2 || next.Item1 == prev.Item1 + count)
return next;
else
return prev;
}
)
.DistinctUntilChanged().Skip(1) //This is 'DistinctExceptFirst'
.Select(_ => Unit.Default);
}
I see nothing wrong with an Observable.Create<T>
solution. In which case I think this extension should work
public static IObservable<IList<T>> BufferExceptOnFlush<T>(this IObservable<T> source,IObservable<Unit> flusher, int bufferSize)
{
return Observable.Create<IList<T>>(observer =>
{
var shared = source.Publish();
var closing = shared.Buffer(bufferSize).Select(x => Unit.Default);
var query = shared.Buffer(() => flusher.Amb(closing).Take(1)).SubscribeSafe(observer);
return new CompositeDisposable(query, shared.Connect());
});
I have not tested it yet, but would enable usage like this
var query = myFluxObservable.BufferExceptOnFlush(myFlusher, 5);
It will work for both hot and cold observables as there is no double subscription