强制冲洗计数型Observable.Buffer C#(Force flush count-type

2019-10-22 17:45发布

建立在这个问题上,其中讨论了冲洗基于时间的缓冲: 强制刷新到Observable.Buffer C#中 ,我有困难的工作如何翻译这个答案给那里的地方,我通过缓冲算,而不是时间的情况下:

var subject = new Subject<Unit>();
var closing = Observable
    .Timer(new TimeSpan(0, 0, 1, 30))
    .Select(x => Unit.Default);

var query =
    mFluxObservable
        .Buffer(() => Observable
            .Amb(subject, closing)
            .Take(1));

我开始使用同一个Amb的逻辑,而不是使用定时器的“项目计数”,但发现自己下去的试图找出如何重置兔子洞。

你可以把我轻轻地在工作如何实现我的思念功能的方向是什么?

var flusher = new Subject<Unit>();
var source = Observable.Interval(TimeSpan.FromSeconds(0.1));
var output = source.BufferExceptOnFlush(100, flusher);

我的来源是“热”,是否有帮助?

PS:我可以用想办法的Observable.Create和某种内部计数器的,但也不是没有锁定...

Answer 1:

我想你可以通过使用关闭观察到的源和合并与冲洗观察到的做到这一点。 以下为我工作:

 var source = new Subject<Unit>();
 var flush = new Subject<Unit>();

 // close buffer every 3 values or when a flush value arrives
 var closing = source.Buffer(3) 
            .Select(x => Unit.Default)
            .Merge(flush);

 var query = source.Buffer(() => closing)
         .Subscribe(Console.WriteLine);

// some test values
source.OnNext(Unit.Default);
source.OnNext(Unit.Default);
source.OnNext(Unit.Default);
source.OnNext(Unit.Default);

// flush buffer
flush.OnNext(Unit.Default);


Answer 2:

这是我到目前为止有:

var flush = new Subject<Unit>();
var source = Observable.Interval(TimeSpan.FromSeconds(1))
            .Select(_ => Unit.Default).Publish().RefCount();
var closer = CloseGenerator(source, flush, 5);
source.Buffer(closer)

//...

private IObservable<Unit> CloseGenerator<T>(IObservable<T> source, 
                                             IObservable<Unit> flusher, int count)
{
     return Observable.CombineLatest(
                        source.Select((_, i) => i), 
                        flusher.Select((_, i) => i).StartWith(-1))
             .Select(ar => Tuple.Create(ar[0], ar[1]))
             .Scan(Tuple.Create(-1, -1), (prev, next) =>
                 {
                     if(next.Item2 != prev.Item2 || next.Item1 == prev.Item1 + count)
                         return next;
                     else
                         return prev;
                 }
             )
             .DistinctUntilChanged().Skip(1) //This is 'DistinctExceptFirst'
             .Select(_ => Unit.Default);
}


Answer 3:

我认为没有错与Observable.Create<T>的解决方案。 在这种情况下,我想这应该延长工作

public static IObservable<IList<T>> BufferExceptOnFlush<T>(this IObservable<T> source,IObservable<Unit> flusher, int bufferSize)
{
 return Observable.Create<IList<T>>(observer =>
 {
     var shared = source.Publish();
     var closing = shared.Buffer(bufferSize).Select(x => Unit.Default);
     var query = shared.Buffer(() => flusher.Amb(closing).Take(1)).SubscribeSafe(observer);
     return new CompositeDisposable(query, shared.Connect());
 });

我没有测试过,但将使这样的用法

var query = myFluxObservable.BufferExceptOnFlush(myFlusher, 5);

它将为热和冷的观测工作,因为没有双订阅



文章来源: Force flush count-type Observable.Buffer c#