Force flush count-type Observable.Buffer c#

2019-08-09 00:48发布

问题:

Building on this question which discusses flushing a time-based buffer: Force flush to Observable.Buffer c#, I'm having difficulty working out how to translate this answer given there to the case where I'm buffering by count, not by time:

var subject = new Subject<Unit>();
var closing = Observable
    .Timer(new TimeSpan(0, 0, 1, 30))
    .Select(x => Unit.Default);

var query =
    mFluxObservable
        .Buffer(() => Observable
            .Amb(subject, closing)
            .Take(1));

I started using the same Amb logic, using an 'item counter' instead of Timer, but found myself going down the rabbit hole of trying to work out how to reset that.

Can you push me gently in the direction of working out how to implement my missing function?

var flusher = new Subject<Unit>();
var source = Observable.Interval(TimeSpan.FromSeconds(0.1));
var output = source.BufferExceptOnFlush(100, flusher);

My source is 'hot', if that helps...

PS: I could figure something out using Observable.Create and some kind of internal counter, but not without locking...

回答1:

I think you can do it by using the source in the closing observable and merging that with a flushing observable. The following worked for me:

 var source = new Subject<Unit>();
 var flush = new Subject<Unit>();

 // close buffer every 3 values or when a flush value arrives
 var closing = source.Buffer(3) 
            .Select(x => Unit.Default)
            .Merge(flush);

 var query = source.Buffer(() => closing)
         .Subscribe(Console.WriteLine);

// some test values
source.OnNext(Unit.Default);
source.OnNext(Unit.Default);
source.OnNext(Unit.Default);
source.OnNext(Unit.Default);

// flush buffer
flush.OnNext(Unit.Default);


回答2:

This is what I've got so far:

var flush = new Subject<Unit>();
var source = Observable.Interval(TimeSpan.FromSeconds(1))
            .Select(_ => Unit.Default).Publish().RefCount();
var closer = CloseGenerator(source, flush, 5);
source.Buffer(closer)

//...

private IObservable<Unit> CloseGenerator<T>(IObservable<T> source, 
                                             IObservable<Unit> flusher, int count)
{
     return Observable.CombineLatest(
                        source.Select((_, i) => i), 
                        flusher.Select((_, i) => i).StartWith(-1))
             .Select(ar => Tuple.Create(ar[0], ar[1]))
             .Scan(Tuple.Create(-1, -1), (prev, next) =>
                 {
                     if(next.Item2 != prev.Item2 || next.Item1 == prev.Item1 + count)
                         return next;
                     else
                         return prev;
                 }
             )
             .DistinctUntilChanged().Skip(1) //This is 'DistinctExceptFirst'
             .Select(_ => Unit.Default);
}


回答3:

I see nothing wrong with an Observable.Create<T> solution. In which case I think this extension should work

public static IObservable<IList<T>> BufferExceptOnFlush<T>(this IObservable<T> source,IObservable<Unit> flusher, int bufferSize)
{
 return Observable.Create<IList<T>>(observer =>
 {
     var shared = source.Publish();
     var closing = shared.Buffer(bufferSize).Select(x => Unit.Default);
     var query = shared.Buffer(() => flusher.Amb(closing).Take(1)).SubscribeSafe(observer);
     return new CompositeDisposable(query, shared.Connect());
 });

I have not tested it yet, but would enable usage like this

var query = myFluxObservable.BufferExceptOnFlush(myFlusher, 5);

It will work for both hot and cold observables as there is no double subscription