How to properly handle onError inside RxJava (Andr

2019-04-03 06:04发布

问题:

I'm getting a list of installed apps on the device. It's a costly operation, so I'm using Rx for that:

    Observable<List> observable = Observable.create(subscriber -> {
        List result = getUserApps();

        subscriber.onNext(result);
        subscriber.onError(new Throwable());
        subscriber.onCompleted();
    });

    observable
            .map(s -> {
                ArrayList<String> list = new ArrayList<>();
                ArrayList<Application> applist = new ArrayList<>();
                for (Application p : (ArrayList<Application>) s) {
                    list.add(p.getAppName());
                    applist.add(p);
                }
                return applist;
            })
            .subscribeOn(Schedulers.newThread())
            .observeOn(AndroidSchedulers.mainThread())
            .doOnError(throwable -> L.e(TAG, "Throwable " + throwable.getMessage()))
            .subscribe(s -> createListView(s, view));

However, my problem is with handling errors. Normally, user launches this screen, waits for apps to load, selects what is best and goes to next page. However, when user quickly changes the UI - app crashes with NullPointer.

Okay, so I implemented this onError. However it still doesn't work, and with above usecase it throws me this:

    04-15 18:12:42.530  22388-22388/pl.digitalvirgo.safemob E/AndroidRuntime﹕ FATAL EXCEPTION: main
        java.lang.IllegalStateException: Exception thrown on Scheduler.Worker thread. Add `onError` handling.
                at rx.internal.schedulers.ScheduledAction.run(ScheduledAction.java:52)
                at android.os.Handler.handleCallback(Handler.java:730)
                at android.os.Handler.dispatchMessage(Handler.java:92)
                at android.os.Looper.loop(Looper.java:176)
                at android.app.ActivityThread.main(ActivityThread.java:5419)
                at java.lang.reflect.Method.invokeNative(Native Method)
                at java.lang.reflect.Method.invoke(Method.java:525)
                at com.android.internal.os.ZygoteInit$MethodAndArgsCaller.run(ZygoteInit.java:1046)
                at com.android.internal.os.ZygoteInit.main(ZygoteInit.java:862)
                at dalvik.system.NativeStart.main(Native Method)
         Caused by: rx.exceptions.OnErrorNotImplementedException
                at rx.Observable$31.onError(Observable.java:7134)
                at rx.observers.SafeSubscriber._onError(SafeSubscriber.java:154)
                at rx.observers.SafeSubscriber.onError(SafeSubscriber.java:111)
                at rx.internal.operators.OperatorDoOnEach$1.onError(OperatorDoOnEach.java:70)
                at rx.internal.operators.NotificationLite.accept(NotificationLite.java:147)
                at rx.internal.operators.OperatorObserveOn$ObserveOnSubscriber.pollQueue(OperatorObserveOn.java:177)
                at rx.internal.operators.OperatorObserveOn$ObserveOnSubscriber.access$000(OperatorObserveOn.java:65)
                at rx.internal.operators.OperatorObserveOn$ObserveOnSubscriber$2.call(OperatorObserveOn.java:153)
                at rx.internal.schedulers.ScheduledAction.run(ScheduledAction.java:47)
                at android.os.Handler.handleCallback(Handler.java:730)
                at android.os.Handler.dispatchMessage(Handler.java:92)
                at android.os.Looper.loop(Looper.java:176)
                at android.app.ActivityThread.main(ActivityThread.java:5419)
                at java.lang.reflect.Method.invokeNative(Native Method)
                at java.lang.reflect.Method.invoke(Method.java:525)
                at com.android.internal.os.ZygoteInit$MethodAndArgsCaller.run(ZygoteInit.java:1046)
                at com.android.internal.os.ZygoteInit.main(ZygoteInit.java:862)
                at dalvik.system.NativeStart.main(Native Method)
         Caused by: java.lang.Throwable
                at pl.digitalvirgo.safemob.fragments.wizard.ApplicationsFragment.lambda$getAppList$25(ApplicationsFragment.java:267)
                at pl.digitalvirgo.safemob.fragments.wizard.ApplicationsFragment.access$lambda$2(ApplicationsFragment.java)
                at pl.digitalvirgo.safemob.fragments.wizard.ApplicationsFragment$$Lambda$3.call(Unknown Source)
                at rx.Observable$1.call(Observable.java:145)
                at rx.Observable$1.call(Observable.java:137)
                at rx.Observable.unsafeSubscribe(Observable.java:7304)
                at rx.internal.operators.OperatorSubscribeOn$1$1.call(OperatorSubscribeOn.java:62)
                at rx.internal.schedulers.ScheduledAction.run(ScheduledAction.java:47)
                at java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:390)
                at java.util.concurrent.FutureTask.run(FutureTask.java:234)
                at java.util.concurrent.ScheduledThreadPoolExecutor$ScheduledFutureTask.access$201(ScheduledThreadPoolExecutor.java:153)
                at java.util.concurrent.ScheduledThreadPoolExecutor$ScheduledFutureTask.run(ScheduledThreadPoolExecutor.java:267)
                at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1080)
                at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:573)
                at java.lang.Thread.run(Thread.java:841)

How should I properly handle this problem?

回答1:

My take is: you are probably using Action1 in

.subscribe(s -> createListView(s, view));

You will need to replace it with Subscriber or Observer which has abstract method onError. This method will be called from subscriber.onError(new Throwable());

EDIT: This is how I would do it. Upon closer look I think the main problem in your code is the early part where you call subscriber.onError even when there's no error. You probably don't need map either because you are technically passing data as-is without manipulation. But I left it in case it is needed later.

     Observable.create(new Observable.OnSubscribe<Application>() {
        @Override
        public void call(Subscriber<? super Application> subscriber) {
            List result = getUserApps();
            if (result != null){
                for (Application app : result){
                     subscriber.onNext(app);
                }
                subscriber.onComplete();
            }else{
                subscriber.onError(new IOException("no permission / no internet / etc"));
               //or if this is a try catch event you can pass the exception
            }     
        }
     })
    .subscribeOn(Schedulers.io())//the thread *observer* runs in
    .observeOn(AndroidSchedulers.mainThread())//the thread *subscriber* runs in
    .map(new Func1<Application, String>() {

        // Mapping methods are where data are manipulated. 
        // You can simply skip this and 
        //do the same thing in Subscriber implementation
        @Override
        public String call(Application application) {
            return application.getName();
        }
    }).subscribe(new Subscriber<String>() {
        @Override
        public void onCompleted() {
           Toast.makeText(context, "completed", Toast.LENGTH_SHORT).show();
           //because subscriber runs in main UI thread it's ok to do UI stuff
           //raise Toast, play sound, etc
        }

        @Override
        public void onError(Throwable e) {
           Log.e("getAppsError", e.getMessage());
           //raise Toast, play sound, etc
        }

        @Override
        public void onNext(String s) {
            listAdapter.add(s);
        }
    });


回答2:

.doOnError() is an operator, and is not as such a part of the Subscriber.

Therefore, having a .doOnError() does not count as an implemented onError().

About the question in one of the comments, of course it is possible to use lambdas.

In this case simply replace

.doOnError(throwable -> L.e(TAG, "Throwable " + throwable.getMessage()))
.subscribe(s -> createListView(s, view))

with

.subscribe(s -> createListView(s, view),
    throwable -> L.e(TAG, "Throwable " + throwable.getMessage()))


回答3:

Here is the newbie response (because I am new in javarx and finally fix this issue) :

Here is your implementation :

    Observable.create(new Observable.OnSubscribe<RegionItem>() {
                @Override
                public void call(Subscriber<? super RegionItem> subscriber) {
                    subscriber.onError(new Exception("TADA !"));
                }
            })
            .doOnNext(actionNext)
            .doOnError(actionError)
            .doOnCompleted(actionCompleted)
            .subscribe();

In this previous implementation, when I subscribe I trigger the error flow ... and I get an application crash.

The problem is that you HAVE TO manage the error from the subscribe() call. The "doOnError(...)" is just a kind of helper that clone the error and give you a new place to do some action after an error. But it doesn't Handle the error.

So you have to change your code with that :

    Observable.create(new Observable.OnSubscribe<RegionItem>() {
                @Override
                public void call(Subscriber<? super RegionItem> subscriber) {
                    subscriber.onError(new Exception("TADA !"));
                }
            })
            .subscribe(actionNext, actionError, actionCompleted);

Not sure about the real explanation, but this is how I fix it. Hope it will help.