Wait for multiple async calls to finish in RxJava

2019-06-24 02:20发布

My scenario is quite simple, but I don't seem to able to find it anywhere.

I have a set of elements that I want to iterate through, for each one call an async function, and then wait for all of them to finish (which again happens in an asynchronous fashion, implemented in the logic of the function). I'm relatively new to RxJava and used to do this easily in NodeJS by passing callback to the function and wait at the end. Here's the pseudocode of what I need (the iterator for elements does not need to be synchronous nor ordered):

for(line in lines){
 callAsyncFunction(line);
}
WAIT FOR ALL OF THEM TO FINISH

Your help is really appreciated!

3条回答
乱世女痞
2楼-- · 2019-06-24 02:44

Use Rx:

Observable
.from(lines)
.flatMap(line -> callAsyncFunctionThatReturnsObservable(line).subscribeOn(Schedulers.io())
.ignoreElements();

At this point, depending on what you want to do you can use an .switchIfEmpty(...) to subscribe to another observable.

查看更多
Summer. ? 凉城
3楼-- · 2019-06-24 02:45

Convert the Iterable of "items to compute" into an Iterable of Observables using defer, and then zip on the Observables.

The more difficult part will be "waiting for them all to finish". As you may have guessed, Reactive Extensions are about "reacting" to stuff, not "waiting for stuff to happen." You can subscribe to the observable, which will emit a single item and then complete if each observable has only one item. That subscriber can do whatever action you would normally do after the wait; this allows you to return and let your code do its thing without blocking on it.

查看更多
做个烂人
4楼-- · 2019-06-24 02:54

Well technically if you think about it, what you need to do is create an Observable from all of your elements, and then zip them together to continue your stream's execution.

That would in pseudocode give you something like this:

List<Observable<?>> observables = new ArrayList<>();
for(line in lines){
   observables.add(Observable.fromCallable(callAsyncFunction(line));
}
Observable.zip(observables, new Function<...>() { ... }); // kinda like Promise.all()

But it might come as no surprise that Observable.from() can expose every element within an iterable as a stream of objects, thus eliminating your need for a loop. So you could create a new Observable that calls onCompleted() when the async operation is complete, using Observable.fromCallable(). Afterwards, you can wait for these new Observables by collecting them into a list.

Observable.from(lines)
   .flatMap(new Func1<String, Observable<?>>() {
        @Override
        public Observable<?> call(String line) {
            return Observable.fromCallable(callAsyncFunction(line)); // returns Callable
        }
    }).toList()
      .map(new Func1<List<Object>, Object>() {
        @Override
        public Object call(List<Object> ignored) {
            // do something;
        }
    });

I'm basing this second half of my answer heavily on this answer.

查看更多
登录 后发表回答