无扩展功能:批量处理事件+每批次之间添加延迟(Reactive Extensions: Proces

2019-07-29 18:46发布

我有一个在某些点上提出了1000个事件几乎在同一时间的应用程序。 我想这样做是为了批事件的50项块,并开始处理它们每10秒。 有没有需要等待一个批次开始新的批处理之前完成。

例如:

10:00:00: 10000 new events received
10:00:00: StartProcessing (events.Take(50))
10:00:10: StartProcessing (events.Skip(50).Take(50))
10:00:15: StartProcessing (events.Skip(100).Take(50))

任何想法如何实现这一目标? 我想反应扩展是要走的路,但其他的解决方案是可以接受的了。

我试图从这里开始:

        var bufferedItems = eventAsObservable
            .Buffer(15)
            .Delay(TimeSpan.FromSeconds(5)

但注意到,像我希望的,而是所有批次同时启动,虽然5秒延迟延迟没有工作。

我还测试了窗口法,但我没有发现任何行为上的差异。 我想在窗口时间跨度实际上意味着“采取一切恰好在接下来的10秒事件:

        var bufferedItems = eventAsObservable
            .Window(TimeSpan.FromSeconds(10), 5)
            .SelectMany(x => x)
            .Subscribe(DoProcessing);

我使用的RX-主要2.0.20304-β。

Answer 1:

如果你不想睡觉线程,你可以这样做:

var tick = Observable.Interval(TimeSpan.FromSeconds(5));

eventAsObservable
.Buffer(50)
.Zip(tick, (res, _) => res)
.Subscribe(DoProcessing);


Answer 2:

试试这个:

    var bufferedItems = eventAsObservable
        .Buffer(50)
        .Do(_ => { Thread.Sleep(10000); });


Answer 3:

只是有这个特定的缓冲方法重载: https://msdn.microsoft.com/en-us/library/hh229200(v=vs.103).aspx

observable.Buffer(TimeSpan.FromSeconds(5), 50);


文章来源: Reactive Extensions: Process events in batches + add delay between every batch