I have a following class:
public class SessionStore {
Subject<Session, Session> subject;
public SessionStore() {
subject = new SerializedSubject<>(BehaviorSubject.create(new Session());
}
public void set(Session session) {
subject.onNext(session);
}
public Observable<UserSession> observe() {
return subject.distinctUntilChanged();
}
}
In activity I observe the session and perform network operation on each change:
private Subscription init() {
return sessionStore
.observe()
.flatMap(new Func1<Session, Observable<Object>>() {
@Override
public Observable<Object> call(Session session) {
return (session.isValid()
? retrofitService.getThingForValid()
: retrofitService.getThingForInalid())
.subscribeOn(Schedulers.io());
}
})
.subscribe(...);
}
Now I have an Okhttp request interceptor, in which I set the session instance from valid to invalid when network response is non 200 code.
This is what happens:
- On initial subscription to session store the
getThingForValid()
is executed, and fails. - OkHttp intercepts the fail and sets new session.
- Session store emits a new, now invalid session.
- The new emission executes a
getThingForInvalid()
method.
What is important to know is that this execution happens in the middle of the previous Retrofit call. This is because OkHttp client is wrapped by Retrofit and all interceptors are executed before Retrofit returns.
Having this in mind, you realize that the second call is being executed and processed by Retrofit already, while the first one hasn't finished yet.
- As the first call finishes, it throws HttpException because response was non 200 code.
- The xception kills the rx stream and with it the second call.
I have tried to ignore this exception in stream but the second call is cancelled by Retrofit anyways.
Do you have any ideas how to make my concept work please?