I'm using RxJava to iterate over a list of files, making a network call to upload each file, then collect the files that were successfully uploaded in a list and persist those files in the subscriber on success.
This code works, except for when an error occurs. The behavior should be that it logs the error and continues, which it does, except when an error occurs the subscriber's onSuccess lambda never gets called.
Is the observer expecting the same number of elements to be emitted as are in the original iterable? How can I skip the errors and have it complete once all items have been iterated over? Is there something other than Single.never()
that will accomplish not forwarding the error to the downstream?
queryFiles()?.let { files ->
Observable.fromIterable(files)
.flatMapSingle { file ->
uploadFile(file)
.onErrorResumeNext { error ->
log(error)
Single.never() // if this is returned onSuccess is never called
}
.map { response ->
file.id = response.id
file
}
}
.toList()
.subscribe( { uploadedFiles ->
persist(uploadedFiles) // if error occurs above, this is never called
}, { error ->
log(error)
})
}
Your problem is that Single
can only result in two values, a successful result or a failure. Turning the failure in an 'ignored' state can be done by first converting it to a Maybe
and then using essentially the same code to handle failure and success.
Maybe.onErrorResumeNext
with a return value of Maybe.empty()
would result in 0 or 1 results while Maybe.map
only executes if it has a value, accurately handling the problem as you've described it.
Adapted code:
.flatMapMaybe { file ->
uploadFile(file).toMaybe()
.onErrorResumeNext { error: Throwable ->
log(error)
Maybe.empty()
}
.map { response ->
file.id = response.id
file
}
}
Here is how I have handled it in the past by using the zip
method.
// create an observable list that you can process for you file uploads
val responses: Response = listOf<Response>()
queryFiles()?.let { file ->
val observable = Observable.create(ObservableOnSubscribe<Response> { emitter ->
// you can modify this section to your data types
try {
// with your uploadFile method you might be able to just add them
// all the responses list
emitter.onNext(uploadFile(file))
emitter.onComplete()
} catch (e: Exception) {
emitter.onError(e)
}
}).subscribeOn(Schedulers.io()).observeOn(AndroidSchedulers.mainThread())
responses.add(observable)
}
// i setup a simple booleanArray to handle success/failure but you can add
// all the files that fail to a list and use that later
val isSuccessful = booleanArrayOf(true)
Observable.zip<Response, Boolean>(responses, Function<Array<Any>, Boolean> { responses ->
var isSuccessful: Boolean? = java.lang.Boolean.TRUE
// handle success or failure
isSuccessful
}).subscribe(Consumer<Boolean> { aBoolean -> isSuccessful[0] = aBoolean!! }, Consumer<Throwable> { throwable ->
isSuccessful[0] = false
}, Action {
// handle your OnComplete here
// I would check the isSuccessful[0] and handle the success or failure
})
This is creating all your uploads into a list of Observables that can be handled and merged with the zip
method. This will merge them all when they are done into an array of any so that you can loop over them - your result from the uploadFile() method. This example is checking for a success or failure from the responses that come back. I removed most of the logic where the comment // handle success or failure
is. In the function method you can keep track of your file uploads that fail or succeed.