RxJava pattern for requesting a remote Observable

2019-03-27 10:02发布

The use case is this: I want to temporarily cache the latest emitted expensive Observable response, but after it expires, return to the expensive source Observable and cache it again, etc.

A pretty basic network cache scenario, but I'm really struggling to get it working.

private Observable<String> getContentObservable() {

    // expensive upstream source (API, etc.)
    Observable<String> sourceObservable = getSourceObservable();

    // cache 1 result for 30 seconds, then return to the source
    return sourceObservable
            .replay(1, 30, TimeUnit.SECONDS)
            .autoConnect()
            .switchIfEmpty(sourceObservable);
}

Initial request: goes to source Second request within 30 seconds of source emitting: delivered from cache Third request outside of cache expiry window: nothing. I subscribe to it and I get no data, but it's not switching to the upstream source Observable.

It looks as if I'm just connecting to my ConnectableObservable from autoConnect() and it's never completing with empty, so it's never triggering my switchIfEmpty().

How can I use this combination of replay(1,x,x) and switchIfEmpty()?

Or am I just approaching this wrong from the start?

3条回答
Emotional °昔
2楼-- · 2019-03-27 10:30
return sourceObservable
            .replay(1, 30, TimeUnit.SECONDS)
            .autoConnect()
            .switchIfEmpty(sourceObservable);

Initial request: goes to source Second request within 30 seconds of source emitting: delivered from cache Third request outside of cache expiry window: nothing. I subscribe to it and I get no data, but it's not switching to the upstream source Observable.

The problem here is, that replay just repeating the same sequence emitted by the sourceObservable in the last 30 sec, but when you subscribe after 30 sec, the sequence has no events, even no onCompleted(), so you can't switchIfEmpty(), it will not work as it's depends on 'onCompleted()' signal and without any emissions, to know that it's 'empty'.

In general, using replay is not suffice in cache scenario, as what you need is a way to resubscribe again in case the cache is expired, and additionally do it by demand, meaning only when some client subscribe to it. (you can do cache that refresh itself every 30 sec, but that's not the desired behavior I guess)


So, as @Yurly Kulikov suggested, you need to maintain a state, and to control the subscription operation for maintaining the state. But I think there is a major flow in the solution, as it's actually not exatcly thread-safe, meaning if 2 subscribes to it 1 after the another, say A and B, while A executes the request and waits in order to save the new result in the cache, B can subscribe as well, and another request will be executed as cached value didn't set yet by A (it didn't finished yet the first network request.

I suggest to use similar approach with a different implementation, that I suggested here:

public class CachedRequest<T> {

private final AtomicBoolean expired = new AtomicBoolean(true);
private final Observable<T> source;
private final long cacheExpirationInterval;
private final TimeUnit cacheExpirationUnit;
private Observable<T> current;

    public CachedRequest(Observable<T> o, long cacheExpirationInterval,
                         TimeUnit cacheExpirationUnit) {
        source = o;
        current = o;
        this.cacheExpirationInterval = cacheExpirationInterval;
        this.cacheExpirationUnit = cacheExpirationUnit;
   }

    private Observable<T> getCachedObservable() {
        return Observable.defer(() -> {
            if (expired.compareAndSet(true, false)) {
                current = source.cache();
                Observable.timer(cacheExpirationInterval, cacheExpirationUnit)                          
                        .subscribe(aLong -> expired.set(true));
            }
            return current;
        });
    }
}

with defer you can return the right Observable according to cache expiration status, so every subscribe happened within the cache expiration will get cached Observable (using cache()) - meaning request will be performed only once. after cache expiration, additional subscribe will trigger new request and will set a new timer to reset the cache expiration.

查看更多
地球回转人心会变
3楼-- · 2019-03-27 10:32

So it's turns out you can use Jake Wharton's replaying share to cache the last value even after dispose. https://github.com/JakeWharton/RxReplayingShare

查看更多
在下西门庆
4楼-- · 2019-03-27 10:32

You will have to maintain a state shared between many callers. That is why you cannot create the Observable every time getContentObservable() is called.

One way to do it is to create an Observable outside, hold the internal state in the Observable (e.g. using buffer), but implementing stateful behaviour is often easier without Observables.

Here is an example with shared state in a field:

private Optional<String> cached = Optional.empty();

private Observable<String> getContentObservable() {
    //use defer to delay cache evaluation to the point when someone subscribes
    return Observable.defer(
        () ->
            cached.isPresent()
                ? Observable.just(cached)
                : fetchAndCache()
    )
    //use the same scheduler for every cached field access
    .subscribeOn(scheduler);
}

private Observable<String> fetchAndCache() {
    Observable<String> cachedSource = getSourceObservable()
        //I assume you only need one, make sure it is 1
        .take(1)
        .cache();

    cachedSource
        .observeOn(scheduler)
        //side-effect stores the state
        .doOnNext(str -> cached = Optional.of(str))
        .flatMap(str -> Observable.timer(30, TimeUnit.SECONDS, scheduler))
        //another side-effect clears the cache
        .subscribe(l -> cached = Optional.empty());

    return cachedSource;        
}
查看更多
登录 后发表回答