I have an app with offline functionality that requests data from a repository for getting data from a local database and an api request. The api call updates the local database so that in case the app is offline data is stored in the db is still displayed.
My ProjectRepository class has a function getAllProjectByUserId which returns an Observable which combines output from two sources (the database and api call)
fun getAllProjectByUserId(userId: Int): Observable<Projects> {
// concatArray takes care of first delivering the db entry to subscribers and secondly data from api
return Observable.concatArrayEager(
getAllProjectsByUserIdFromDb(userId),
getAllProjectsByUserIdFromApi(userId))
}
This function is called in the DataProvider:
fun getAllProjectByUserId(userId: Int, fetchDataErrorListener: FetchDataErrorListener): Observable<Projects> = projectRepository.getAllProjectByUserId(userId).handleErrorFilterAndDebounce(fetchDataErrorListener)
When the data from both sources are requested, the dataprovider takes care of only emitting the observables that didn't cause an onError response (see .filter operator) and debouncing (e.g. ignoring) the database observable emission if the api observable is emitted within the API_FETCH_DEBOUNCE_TIMEOUT (in my case 300ms).
The code that takes care of this looks like:
private fun <T> Observable<T>.handleErrorFilterAndDebounce(onRepoErrorListener: FetchDataErrorListener): Observable<T> {
return this
// materialize wraps the observed object types into a Notification object on which we can check
// whether the onNext, onError and/or onComplete methods are called. This is used in map function
// to check if an error occurred and invoke error callback in presenter
.materialize()
// since map operator can encounter an error that triggers a callback handling an UI event we need to make sure
// the error is handled (observedOn) the main thread
.observeOn(AndroidSchedulers.mainThread())
.map {
// if the observables onError is called, invoke callback so that presenters can handle error
it.error?.let {
Logging.logError(CLASS_TAG, " error localized message: " + it.localizedMessage + " cause.throwable=" + it.cause)
handleErrorCallback(it, onRepoErrorListener)
}
// put item back into stream
it
}
.filter {
Logging.logError(CLASS_TAG, " it.isOnError = " + it.isOnError)
// Only return observable on which onError is not called.
// This way if api call returns an error (within debounce timeout) the database item won't be ignored
// and wil be received by all subscribers
!it.isOnError
}
// reverses the effect of .materialize()
.dematerialize<T>()
//Drop DB data if we can fetch item from API fast enough to avoid UI flicker using the specified timeout
.debounce(API_FETCH_DEBOUNCE_TIMEOUT, MILLISECONDS)
}
Now here's my problem:
I would like to use the .concatArrayEager instead of the .concatArray operator since concatArrayEager starts both source requests at same time instead of consecutive, thus the whole operation should take less time.
But: Given that there is no network connection and i do have data stored in the local database. When I use .concatArray, subscribing to the observable that the DataProvider.getAllProjectsByUserId() function returns does in fact give me the data from the database
If i change the operator to use .concatArrayEager instead of .concatArray no data seems to be emitted. E.g. the subscriber is not receiving any data. Is this a RxJava bug, or am I missing something here?
concatArrayEager
orconcatArray
will terminate if one of the sources encounter error , Instead you should useconcatArrayDelayError
as per documentation :source here