I have a situation where a long running process is wrapped in an Observable.fromCallable()
. This process is an OkHttp
call and, if terminated, will throw an IOException
. If the observable is subscribed to, then the disposable is stored in a CompositeDisposable
and the exception is handled as expected. However, my code will clear the CompositeDisposable
in some cases, triggering the OkHttp
thread termination with no error handling, causing the app to crash with an unhandled exception. Here's a simple unit test example of this problem:
@Test
public void test(){
CompositeDisposable compositeDisposable = new CompositeDisposable();
Observable<Object> o = Observable.fromCallable(new Callable<Object>() {
@Override
public Object call() throws Exception {
System.out.println("sleeping - this sleep will be interrupted when compositeDisposable gets cleared");
Thread.sleep(3000);
return null;
}
});
compositeDisposable.add(o.subscribeOn(new IoScheduler()).subscribe());
try {
Thread.sleep(1000);
} catch (InterruptedException e) {
e.printStackTrace();
}
compositeDisposable.clear();
}
Is there any way to work around this problem?