I want to encapsulate in an Observable
a logic that queries a ContentProvider
and also subscribes ContentProvider
cursor to supply continuous updates.
As the observable does IO work I need to subscribe it in Schedulers.io()
. The problem with that is that then I can't register a ContentObserver
because it needs a looper prepared thread.
What is the recommended way to manage that and encapsulate it in a single Observable
.
Code to illustrate that:
public Observable<Integer> unreadCountObservable() {
return Observable.create(subscriber -> {
new UnreadCountObservable(subscriber);
});
}
private class UnreadCountObservable {
private Subscriber subscriber;
public UnreadCountObservable(Subscriber subscriber) {
this.subscriber = subscriber;
Cursor cursor = queryUnread(subscriber);
cursor.registerContentObserver(observer);
subscriber.add(Subscriptions.create(() -> {
cursor.unregisterContentObserver(observer);
cursor.close();
}));
}
@NonNull
private Cursor queryUnread(Subscriber subscriber) {
Cursor cursor = contextProvider.getContext().getContentResolver().query(Uri.parse(CONTENT_URI),SMS_PROJECTION,SMS_SELECTION_UNREAD,SMS_PROJECTION,null);
if(cursor.moveToNext()) {
Integer count = cursor.getInt(0);
subscriber.onNext(count);
} else {
subscriber.onNext(0);
}
return cursor;
}
private ContentObserver observer = new ContentObserver(new Handler()) {
@Override
public boolean deliverSelfNotifications() {
return false;
}
@Override
public void onChange(boolean selfChange) {
Timber.d("New sms data changed");
queryUnread(subscriber);
}
};
}
Note 1 The problem with the above code is that it can't be called with .subscribeOn(Schedulers.io()
due to the registerObserver, and if it's called it mainThread then the queries also run on them)
Note: Encapsulating all in a single Observable
is a key requirement and the motive of this question
My best idea now is to create a HandlerThread for activity where I use the Observable and use the looper from that thread. But want to know if there are better alternatives, and if making a generic scheduler (e.g looperIoScheduler()) makes sense on can cause problems.
Ok, so why won't this work?
In
Observable
chain you can change the thread as often as you wish. Have a look here.The function
rx.Observable#observeOn(rx.Scheduler)
can be in any place inside the chain. Try to do something like this (pseudocode):