-->

How to manage Looper thread needed in RxJava

2019-07-14 04:21发布

问题:

I want to encapsulate in an Observable a logic that queries a ContentProvider and also subscribes ContentProvider cursor to supply continuous updates.

As the observable does IO work I need to subscribe it in Schedulers.io(). The problem with that is that then I can't register a ContentObserver because it needs a looper prepared thread.

What is the recommended way to manage that and encapsulate it in a single Observable.

Code to illustrate that:

public Observable<Integer> unreadCountObservable() {
    return Observable.create(subscriber -> {
        new UnreadCountObservable(subscriber);
    });
}

private class UnreadCountObservable {
    private Subscriber subscriber;

    public UnreadCountObservable(Subscriber subscriber) {
        this.subscriber = subscriber;
        Cursor cursor = queryUnread(subscriber);
        cursor.registerContentObserver(observer);
        subscriber.add(Subscriptions.create(() -> {
            cursor.unregisterContentObserver(observer);
            cursor.close();
        }));
    }

    @NonNull
    private Cursor queryUnread(Subscriber subscriber) {
        Cursor cursor = contextProvider.getContext().getContentResolver().query(Uri.parse(CONTENT_URI),SMS_PROJECTION,SMS_SELECTION_UNREAD,SMS_PROJECTION,null);
        if(cursor.moveToNext()) {
            Integer count = cursor.getInt(0);
            subscriber.onNext(count);
        } else {
            subscriber.onNext(0);
        }
        return cursor;
    }

    private ContentObserver observer = new ContentObserver(new Handler()) {
        @Override
        public boolean deliverSelfNotifications() {
            return false;
        }

        @Override
        public void onChange(boolean selfChange) {
            Timber.d("New sms data changed");
            queryUnread(subscriber);
        }
    };
}

Note 1 The problem with the above code is that it can't be called with .subscribeOn(Schedulers.io() due to the registerObserver, and if it's called it mainThread then the queries also run on them)

Note: Encapsulating all in a single Observable is a key requirement and the motive of this question

My best idea now is to create a HandlerThread for activity where I use the Observable and use the looper from that thread. But want to know if there are better alternatives, and if making a generic scheduler (e.g looperIoScheduler()) makes sense on can cause problems.

回答1:

In Observable chain you can change the thread as often as you wish. Have a look here.

The function rx.Observable#observeOn(rx.Scheduler) can be in any place inside the chain. Try to do something like this (pseudocode):

Observable.just(cursor)
        .observeOn(AndroidSchedulers.mainThread())
        .map((Cursor) -> {
                cursor.registerContentObserver(observer);
                return cursor;
            }
        }).observeOn(Schedulers.io());
subscriber.add(Subscriptions.create(() -> {
        cursor.unregisterContentObserver(observer);
        cursor.close();
    }));


回答2:

Ok, so why won't this work?

public Observable<Integer> unreadCountObservable() {
    return Observable.defer(() -> Observable.create(subscriber -> {
        new UnreadCountObservable(subscriber);
    }))
    .subscribeOn(Schedulers.io());
}