I wrote some code to download a file from server meanwhile updating progress bar. Downloading code was running in Schedulers.io
thread and updating ui code was running in AndroidSchedulers.mainThread
. My program terminated after download began. Here is my code:
Observable
.create(new Observable.OnSubscribe<String>() {
@Override
public void call(Subscriber<? super String> subscriber) {
try {
Response response = getResponse(url);
if (response != null && response.isSuccessful()) {
InputStream is = response.body().byteStream();
subscriber.onNext(response.body().contentLength()); // init progress
File storedFile = Utils.getStoredFile(context, filePath);
OutputStream os = new FileOutputStream(storedFile);
byte[] buffer = new byte[1024];
int len;
while ((len = is.read(buffer)) != -1) {
// write data
os.write(buffer, 0, len);
count += len;
subscriber.onNext(count); // update progress
}
if (!subscriber.isUnsubscribed()) {
subscriber.onCompleted();
}
os.close();
is.close();
response.body().close();
} catch (InterruptedException e) {
subscriber.onError(e);
}
}
})
.subscribeOn(Schedulers.io()) // io and network operation
.observeOn(AndroidSchedulers.mainThread()) // UI view update operation
.subscribe(new Observer<Long>() {
@Override
public void onCompleted() {
Log.d(TAG, "onCompleted -> " + Thread.currentThread().getName());
}
@Override
public void onError(Throwable e) {
Log.d(TAG, "onError -> " + e.getMessage());
}
@Override
public void onNext(Long progress) {
Log.d(TAG, "onNext -> " + Thread.currentThread().getName());
Log.d(TAG, "onNext progress -> " + progress);
// here update view in ui thread
}
}
}
And here is error text:
java.io.InterruptedIOException: thread interrupted
at okio.Timeout.throwIfReached(Timeout.java:145)
at okio.Okio$2.read(Okio.java:136)
at okio.AsyncTimeout$2.read(AsyncTimeout.java:211)
at okio.RealBufferedSource.read(RealBufferedSource.java:50)
at com.squareup.okhttp.internal.http.HttpConnection$FixedLengthSource.read(HttpConnection.java:418)
at okio.RealBufferedSource$1.read(RealBufferedSource.java:371)
at java.io.InputStream.read(InputStream.java:163)
at com.eldorado.rxfiledownloaddemo.presenter.Presenter$1.call(Presenter.java:74)
at com.eldorado.rxfiledownloaddemo.presenter.Presenter$1.call(Presenter.java:52)
at rx.Observable.unsafeSubscribe(Observable.java:8098)
at rx.internal.operators.OperatorSubscribeOn$1$1.call(OperatorSubscribeOn.java:62)
at rx.internal.schedulers.ScheduledAction.run(ScheduledAction.java:55)
at java.util.concurrent.Executors$RunnableAdapter.call(Executor at java.util.concurrent.FutureTask.run(FutureTask.java:23 at java.util.concurrent.ScheduledThreadPoolExecutor$ScheduledFutureTask.access$201(ScheduledThreadPoolExecutor.java:153)
at java.util.concurrent.ScheduledThreadPoolExecutor$ScheduledFutureTask.run(ScheduledThreadPoolExecutor.java:267)
at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1080)
at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:573)
at java.lang.Thread.run(Thread.java:841)
The observerOn is apply to the Observable.create but internaly you're creating a new observable in another thread. So your pipeline never give the monitor to the main thread. I think your code it's too much complex for what you want to achieve.
Just in case that help you out to understand the concepts of Scheduler
https://github.com/politrons/reactive/blob/master/src/test/java/rx/observables/scheduler/ObservableAsynchronous.java