buffer while processing items

2019-02-18 06:18发布

问题:

I have an event that fires regularly. Let's assume that processing the event takes ~1s. Instead of waiting 1s for each received event I want to accumulate events until the last processing is done. When processing is done I want to proces the event data I received during the last processing:

e1   e2   e3                                                            e4   e5   e6                 e7                                              events happening   
---------------------------------------------------------------------------------------------------------------------------------------------------> time
                         1s                      2s                     3s                       4s                       5s                      6s
p(e1)                    p(e2, e3)                                      p(e4)                    p(e5, e6)                p(e7)
[-----------------------][-----------------------]                      [-----------------------][-----------------------][-----------------------]  processing of items                        

In above example, processing start as soon as e1 happens. While the processing takes places 2 more events have arrived. They should be stored so when p(e1) - which means the processing of e1 - 
is finished the processing of the events e2 and e3 takes place. 

This proces is similar to a rolling build: a changeset is checked in, the buildserver starts building and once the build is finished all changesets that have been 
checked in during the build will then be processed.

How should I do that using Rx?

I have tried using the Buffer combined with an opening and closing selector but I can't get it right. Any examples or direction are appreciated!

Let's assume a Subject<int> as the input stream.

I have tried something like this but I am totally lost.

var observer1 = input
.Buffer(bc.Where(open => open), _ => bc.Where(open => !open))
.Subscribe(ev =>
{
    bc.OnNext(true);
    String.Format("Processing items {0}.", string.Join(", ", ev.Select(e => e.ToString())).Dump());
    Thread.Sleep(300);
    bc.OnNext(false);
});

回答1:

This is non-trival. Fortunately @DaveSexton has already done all the hard work. You want BufferIntrospective from the Rxx library. Check out the source here.

The reason why this is hard is because IObserver<T> doesn't have built-in means to signal back-pressure - other than the subtlety of the blocking of OnXXX invocations. The Observable needs to pay attention to the Observer, and you need to introduce concurrency to manage the buffering.

Also note that if you have multiple subscribers, they will get different data as what they receive depends on both the source event rate and their consumption rate.

Another approach is to just add all the events to a thread-safe queue in your OnNext handler, and have a separate task that empties the queue in a loop. BufferIntrospective is probably cleaner though.

Had a little play, and this toy implementation seems to work. But Rxx will be more robust, so this is just pedagogical really to show what sort of thing is involved. The key is the introduction of concurrency via the scheduler.

public static IObservable<IList<TSource>> BufferIntrospective<TSource>(
    this IObservable<TSource> source,
    IScheduler scheduler = null)
{
    scheduler = scheduler ?? Scheduler.Default;
    return Observable.Create<IList<TSource>>(o => {
        Subject<Unit> feedback = new Subject<Unit>();
        var sourcePub = source.Publish().RefCount();
        var sub = sourcePub.Buffer(
            () => feedback).ObserveOn(scheduler).Subscribe(@event =>
            {                
                o.OnNext(@event);
                feedback.OnNext(Unit.Default);
            },
            o.OnError,
            o.OnCompleted);
        var start = sourcePub.Take(1).Subscribe(_ => feedback.OnNext(Unit.Default));
        return new CompositeDisposable(sub, start);
    });        
}

This sample code shows the usage and how two differently paced subscribers get different buffering of events, one receiving batches of 5, the other batches of 10.

I am using LINQPad's Dump to show the contents of each buffer easily.

var xs = Observable.Interval(TimeSpan.FromSeconds(0.2)).Take(30);

var buffered = xs.BufferIntrospective();

buffered.Subscribe(x => {
    x.Dump();
    Task.Delay(TimeSpan.FromSeconds(1)).Wait();
});

buffered.Subscribe(x => {
    x.Dump();
    Task.Delay(TimeSpan.FromSeconds(2)).Wait();
});