Basically, I have two Flowables F
and G
and I want to use combineLatest
on them, but I want the combined Flowable
to already complete if F
completes (even if G
is still running).
Here is an example of what I what to achieve with an ugly solution:
fun combineFandGbutTerminateIfFTerminates(F: Flowable<Int>, G: Flowable<Int>) : Flowable<Pair<Int, Int>> {
val _F = F.share()
val _G = G.takeUntil(_F.ignoreElements().toFlowable<Nothing>())
val FandG = Flowables.combineLatest(_F, _G)
return FandG
}
We can extract that into and extension function:
fun<T> Flowable<T>.completeWith(other: Flowable<*>) : Flowable<T> {
return takeUntil(other.ignoreElements().toFlowable<Nothing>())
}
Is there a nicer way to express that?
I came to the following solution. It allows to combine one master with many slave sources. If the master completes, the combined
Flowable
completes. However, if a slave completes before the master, an errorSlaveCompletedPrematurelyError
is propagated.