Reactive Extensions Parallel processing based on s

2019-04-02 14:46发布

问题:

I'm new to Reactive Extensions. I have objects collection and call a method for each object and method returns Boolean. Instead of looping through each by using for each loop and calling the method, is there a way in reactive extensions to call concurrently(fork and join) the method for a given number of objects(ex 5 at a time) and after first one done, 6th one should call method and it should continue until all the objects are done.

I appreciate your response.

回答1:

IObservable<bool> someBoolAsyncMethod(SomeObject o)

someCollection.ToObservable()
    .Select(x => Observable.Defer(() => 
        someBoolAsyncMethod(x).Select(y => new { Item = x, Result = y})))
    .Merge(5)
    .ToList()
    .Subscribe(newListOfBools);