RxJava: InterruptedIOException

2019-09-06 07:23发布

问题:

I wrote some code to download a file from server meanwhile updating progress bar. Downloading code was running in Schedulers.io thread and updating ui code was running in AndroidSchedulers.mainThread. My program terminated after download began. Here is my code:

    Observable
    .create(new Observable.OnSubscribe<String>() {
        @Override
        public void call(Subscriber<? super String> subscriber) {
            try {
                Response response = getResponse(url);
                if (response != null && response.isSuccessful()) {
                    InputStream is = response.body().byteStream();
                    subscriber.onNext(response.body().contentLength()); // init progress
                    File storedFile = Utils.getStoredFile(context, filePath);
                    OutputStream os = new FileOutputStream(storedFile);

                    byte[] buffer = new byte[1024];
                    int len;
                    while ((len = is.read(buffer)) != -1) {
                        // write data
                        os.write(buffer, 0, len);

                        count += len;
                        subscriber.onNext(count); // update progress
                    }

                    if (!subscriber.isUnsubscribed()) {
                        subscriber.onCompleted();
                    }

                    os.close();
                    is.close();
                    response.body().close();

            } catch (InterruptedException e) {
                subscriber.onError(e);
            }
        }
    })
    .subscribeOn(Schedulers.io()) // io and network operation  
    .observeOn(AndroidSchedulers.mainThread()) // UI view update operation  
    .subscribe(new Observer<Long>() {
        @Override
        public void onCompleted() {
            Log.d(TAG, "onCompleted -> " + Thread.currentThread().getName());
        }

        @Override
        public void onError(Throwable e) {
            Log.d(TAG, "onError -> " + e.getMessage());
        }

        @Override
        public void onNext(Long progress) {
            Log.d(TAG, "onNext -> " + Thread.currentThread().getName());
            Log.d(TAG, "onNext progress -> " + progress);
            // here update view in ui thread
        }
    }
    }

And here is error text:

java.io.InterruptedIOException: thread interrupted
    at okio.Timeout.throwIfReached(Timeout.java:145)
    at okio.Okio$2.read(Okio.java:136)
    at okio.AsyncTimeout$2.read(AsyncTimeout.java:211)
    at okio.RealBufferedSource.read(RealBufferedSource.java:50)
    at com.squareup.okhttp.internal.http.HttpConnection$FixedLengthSource.read(HttpConnection.java:418)
    at okio.RealBufferedSource$1.read(RealBufferedSource.java:371)
    at java.io.InputStream.read(InputStream.java:163)
    at com.eldorado.rxfiledownloaddemo.presenter.Presenter$1.call(Presenter.java:74)
    at com.eldorado.rxfiledownloaddemo.presenter.Presenter$1.call(Presenter.java:52)
    at rx.Observable.unsafeSubscribe(Observable.java:8098)
    at rx.internal.operators.OperatorSubscribeOn$1$1.call(OperatorSubscribeOn.java:62)
    at rx.internal.schedulers.ScheduledAction.run(ScheduledAction.java:55)
    at java.util.concurrent.Executors$RunnableAdapter.call(Executor    at java.util.concurrent.FutureTask.run(FutureTask.java:23    at java.util.concurrent.ScheduledThreadPoolExecutor$ScheduledFutureTask.access$201(ScheduledThreadPoolExecutor.java:153)
    at java.util.concurrent.ScheduledThreadPoolExecutor$ScheduledFutureTask.run(ScheduledThreadPoolExecutor.java:267)
    at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1080)
    at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:573)
    at java.lang.Thread.run(Thread.java:841)

回答1:

The observerOn is apply to the Observable.create but internaly you're creating a new observable in another thread. So your pipeline never give the monitor to the main thread. I think your code it's too much complex for what you want to achieve.

Just in case that help you out to understand the concepts of Scheduler

https://github.com/politrons/reactive/blob/master/src/test/java/rx/observables/scheduler/ObservableAsynchronous.java



标签: rx-java