I have an event that fires regularly. Let's assume that processing the event takes ~1s. Instead of waiting 1s for each received event I want to accumulate events until the last processing is done. When processing is done I want to proces the event data I received during the last processing:
e1 e2 e3 e4 e5 e6 e7 events happening
---------------------------------------------------------------------------------------------------------------------------------------------------> time
1s 2s 3s 4s 5s 6s
p(e1) p(e2, e3) p(e4) p(e5, e6) p(e7)
[-----------------------][-----------------------] [-----------------------][-----------------------][-----------------------] processing of items
In above example, processing start as soon as e1 happens. While the processing takes places 2 more events have arrived. They should be stored so when p(e1) - which means the processing of e1 -
is finished the processing of the events e2 and e3 takes place.
This proces is similar to a rolling build: a changeset is checked in, the buildserver starts building and once the build is finished all changesets that have been
checked in during the build will then be processed.
How should I do that using Rx?
I have tried using the Buffer combined with an opening and closing selector but I can't get it right. Any examples or direction are appreciated!
Let's assume a Subject<int>
as the input stream.
I have tried something like this but I am totally lost.
var observer1 = input
.Buffer(bc.Where(open => open), _ => bc.Where(open => !open))
.Subscribe(ev =>
{
bc.OnNext(true);
String.Format("Processing items {0}.", string.Join(", ", ev.Select(e => e.ToString())).Dump());
Thread.Sleep(300);
bc.OnNext(false);
});