Schedulers.io() not returning to main thread

2019-05-06 18:04发布

I'm using RxParse to parse query's async load but when i subscribe my observable using subscribeOn(Schedulers.io()) my onCompleted method is never called on main thread. Instead of this, my onCompleted method is called inside of worker thread pool. If i use observeOn(AndroidSchedulers.mainThread) everything will work as well, but my onNextMethod will be called on main thread too and I don't want it.

There is something wrong in my code?

Have anything wrong in my code?

ParseObservable.find(myQuery)
    .map(myMapFunc())
    .subscribeOn(AndroidSchedulers.handlerThread(new Handler()))
    .subscribe(
        new Subscriber<MyObj>() {
           @Override
            public void onError(Throwable e) {
                Log.e("error","error",e);
            }

            @Override
            public void onNext(T t) {
                // ... worker thread (but here is ok)
            }

            public void onCompleted() {
                // ... worker thread again instead of mainThread
            }
        }
    )
);

4条回答
孤傲高冷的网名
2楼-- · 2019-05-06 18:38

Unfortunately the subscription is in the same thread for all methods (onNext, onError and onCompleted

But you can observe in the Schedulers.io() and inside the onNext(T t) method, create a new Observable to listen in the MainThread like this:

ParseObservable.find(myQuery)
    .map(myMapFunc())
    .subscribeOn(Schedulers.io())
    .subscribe(
        new Subscriber<MyObj>() {
           @Override
            public void onError(Throwable e) {
                Log.e("error","error",e);
            }

            @Override
            public void onNext(T t) {
                Observable.just(t)
                    .observeOn(AndroidSchedulers.mainThread())
                    .subscribe((t) -> {
                         // do something in MainThread
                    })
            }

            public void onCompleted() {
                // ... worker thread again instead of mainThread
            }
        }
    )
);

I hope it help!

查看更多
可以哭但决不认输i
3楼-- · 2019-05-06 18:42

It is not advisable to subscribe within a subscription.

subscribeOn determines where the Observable chain will start when an observer subscribes to it.

observeOn can be used at different points (and multiple times, if need be) throughout your observable chain to pass control between threads. (You can verify this by checking whether you're on the main thread or not within each of these blocks).

ParseObservable.find(myQuery)
    .map(myMapFunc())

    // Added this:
    .doOnNext(obj -> {
        // NOTE: This will happen on your `subscribeOn` scheduler 
        // Do something with `obj` here while on worker thread
    }

    .subscribeOn(AndroidSchedulers.handlerThread(new Handler()))

    // Added this:
    .observeOn(AndroidSchedulers.mainThread())    

    .subscribe(new Subscriber<>() {
        next -> {
            // NOTE: This will happen on the main thread
        },
        error -> {
            Log.e("error","error",e);
            // NOTE: This will happen on the main thread
        },
        () -> {
            // NOTE: This will happen on the main thread
        }
    });
查看更多
smile是对你的礼貌
4楼-- · 2019-05-06 18:43

I would recommend using "side action" operators in this case. It seems to me like a slightly more elegant solution than using nested observables:

ParseObservable.find(myQuery)
        .map(myMapFunc())
        .subscribeOn(AndroidSchedulers.handlerThread(new Handler()))
        .doOnCompleted(() -> onCompleteAction())
        .observeOn(AndroidSchedulers.mainThread())
        .doOnNext(value -> onNext(value))
        .subscribe();
查看更多
一纸荒年 Trace。
5楼-- · 2019-05-06 18:49

First you need to understand the difference between subscribeOn() and observeOn(). These are two completely different operators that affect different parts of the Rx chain.

subscribeOn() specifies where your Observable will do its work. It will not affect where onNext(), onError(), and onComplete() execute.

observeOn() specifies where the the callbacks (e.g. onNext()) are executed. It will not affect where your Observable does its work.

All the callbacks will occur on the same thread. You cannot specify that some callbacks occur on one thread and some happen on another through any RxJava APIs. If that is the behavior you desire, you will have to implement it yourself in your callbacks.

查看更多
登录 后发表回答