My scenario is quite simple, but I don't seem to able to find it anywhere.
I have a set of elements that I want to iterate through, for each one call an async function, and then wait for all of them to finish (which again happens in an asynchronous fashion, implemented in the logic of the function). I'm relatively new to RxJava and used to do this easily in NodeJS by passing callback to the function and wait at the end. Here's the pseudocode of what I need (the iterator for elements does not need to be synchronous nor ordered):
for(line in lines){
callAsyncFunction(line);
}
WAIT FOR ALL OF THEM TO FINISH
Your help is really appreciated!
Use Rx:
At this point, depending on what you want to do you can use an
.switchIfEmpty(...)
to subscribe to another observable.Convert the Iterable of "items to compute" into an Iterable of Observables using defer, and then zip on the Observables.
The more difficult part will be "waiting for them all to finish". As you may have guessed, Reactive Extensions are about "reacting" to stuff, not "waiting for stuff to happen." You can subscribe to the observable, which will emit a single item and then complete if each observable has only one item. That subscriber can do whatever action you would normally do after the wait; this allows you to return and let your code do its thing without blocking on it.
Well technically if you think about it, what you need to do is create an Observable from all of your elements, and then zip them together to continue your stream's execution.
That would in pseudocode give you something like this:
But it might come as no surprise that
Observable.from()
can expose every element within an iterable as a stream of objects, thus eliminating your need for a loop. So you could create a new Observable that callsonCompleted()
when the async operation is complete, usingObservable.fromCallable()
. Afterwards, you can wait for these new Observables by collecting them into a list.I'm basing this second half of my answer heavily on this answer.