The use case is this: I want to temporarily cache the latest emitted expensive Observable response, but after it expires, return to the expensive source Observable and cache it again, etc.
A pretty basic network cache scenario, but I'm really struggling to get it working.
private Observable<String> getContentObservable() {
// expensive upstream source (API, etc.)
Observable<String> sourceObservable = getSourceObservable();
// cache 1 result for 30 seconds, then return to the source
return sourceObservable
.replay(1, 30, TimeUnit.SECONDS)
.autoConnect()
.switchIfEmpty(sourceObservable);
}
Initial request: goes to source Second request within 30 seconds of source emitting: delivered from cache Third request outside of cache expiry window: nothing. I subscribe to it and I get no data, but it's not switching to the upstream source Observable.
It looks as if I'm just connecting to my ConnectableObservable from autoConnect()
and it's never completing with empty, so it's never triggering my switchIfEmpty()
.
How can I use this combination of replay(1,x,x)
and switchIfEmpty()
?
Or am I just approaching this wrong from the start?
The problem here is, that replay just repeating the same sequence emitted by the
sourceObservable
in the last 30 sec, but when you subscribe after 30 sec, the sequence has no events, even noonCompleted()
, so you can'tswitchIfEmpty()
, it will not work as it's depends on'onCompleted()'
signal and without any emissions, to know that it's 'empty'.In general, using replay is not suffice in cache scenario, as what you need is a way to resubscribe again in case the cache is expired, and additionally do it by demand, meaning only when some client subscribe to it. (you can do cache that refresh itself every 30 sec, but that's not the desired behavior I guess)
So, as @Yurly Kulikov suggested, you need to maintain a state, and to control the subscription operation for maintaining the state. But I think there is a major flow in the solution, as it's actually not exatcly thread-safe, meaning if 2 subscribes to it 1 after the another, say A and B, while A executes the request and waits in order to save the new result in the cache, B can subscribe as well, and another request will be executed as cached value didn't set yet by A (it didn't finished yet the first network request.
I suggest to use similar approach with a different implementation, that I suggested here:
with defer you can return the right Observable according to cache expiration status, so every subscribe happened within the cache expiration will get cached Observable (using cache()) - meaning request will be performed only once. after cache expiration, additional subscribe will trigger new request and will set a new timer to reset the cache expiration.
So it's turns out you can use Jake Wharton's replaying share to cache the last value even after dispose. https://github.com/JakeWharton/RxReplayingShare
You will have to maintain a state shared between many callers. That is why you cannot create the Observable every time getContentObservable() is called.
One way to do it is to create an Observable outside, hold the internal state in the Observable (e.g. using buffer), but implementing stateful behaviour is often easier without Observables.
Here is an example with shared state in a field: