RxJava Observable.create wrapping observable subsc

2019-09-16 06:06发布

问题:

I used Observable.create so I could notify the subscriber when certain data was available. I am a little uncertain of subscribing to observables inside of my create method. Are these nested subscriptions going to give me any sort of issue? I'm not completely familiar with creating observables using Observable.create so I wanted to make sure I'm not doing anything out of the ordinary or misusing it. Thank you in advance!

abstract class NetworkResource<ApiType, DbType> constructor(private val schedulerProvider: SchedulerProvider) {

    abstract fun fetchFromApi(): Single<ApiType>
    abstract fun fetchFromDb(): Observable<Optional<DbType>>
    abstract fun saveToDb(apiType: ApiType?)
    abstract fun shouldFetchFromApi(cache: DbType?): Boolean

    fun fetch(): Observable<Optional<DbType>>  {
        return Observable.create<Optional<DbType>> {
            val subscriber = it

            fetchFromDb()
                    .subscribe({
                        subscriber.onNext(it)

                        if(shouldFetchFromApi(it.get())) {
                            fetchFromApi()
                                    .observeOn(schedulerProvider.io())
                                    .map {
                                        saveToDb(it)
                                        it
                                    }
                                    .observeOn(schedulerProvider.ui())
                                    .flatMapObservable {
                                        fetchFromDb()
                                    }
                                    .subscribe({
                                        subscriber.onNext(it)
                                        subscriber.onComplete()
                                    })
                        }
                        else {
                            subscriber.onComplete()
                        }
                    })

        }
    }
}

回答1:

Yes, it will cause an issues.

First, it is not idiomatic to nest Observable like this, one of the strengths of Reactive approach, is composing Observables, and thus have single clean stream. with this way, you are breaking the chain, and the immediate result is intertwined code which is harder to read, and more code to wire up the notification events, basically it is like wrapping async callback methods with Observable.
here as you have already reactive components you can simply compose them instead of treating them with callback approach.

Second, as a result of breaking the chain, the most sever and immediate one - unsubscribing the outer Observable will not affect automatically the inner Observable. same goes for trying to add subscribeOn() and with different scenario where backpressure is important it's also apply.

an composing alternative might be something like this:

fun fetch2(): Observable<Optional<DbType>> {
        return fetchFromDb()
                .flatMap {
                    if (shouldFetchFromApi(it.get())) {
                        fetchFromApi()
                                .observeOn(schedulerProvider.io())
                                .doOnSuccess { saveToDb(it) }
                                .observeOn(schedulerProvider.ui())
                                .flatMapObservable {
                                    fetchFromDb()
                                }

                    } else {
                        Observable.empty()
                    }
                }
    }

if from some reason, you want in any case the first fetchFromDb() result to be emitted separately, you can also do it using publish() with selector:

 fun fetch2(): Observable<Optional<DbType>> {
    return fetchFromDb()
            .publish {
                Observable.merge(it,
                        it.flatMap {
                            if (shouldFetchFromApi(it.get())) {
                                fetchFromApi()
                                        .observeOn(schedulerProvider.io())
                                        .doOnSuccess { saveToDb(it) }
                                        .observeOn(schedulerProvider.ui())
                                        .flatMapObservable {
                                            fetchFromDb()
                                        }

                            } else {
                                Observable.empty()
                            }
                        })
            }

}