Caching network calls using RxJava for some durati

2019-07-25 01:23发布

I am making a network using Retorfit + RxJava2 and I want to cache the response for 30 seconds. Any calls made after 30 seconds interval should get the latest results from server. I tried doing this using Replay operator but it still makes a network call every time I call subscribe. I am not an expert in RxJava so maybe my understanding of using Replay for caching like that is wrong.

public Observable<Name> getName() {
        return retrofitBuilder.getName()
                .subscribeOn(Schedulers.io())
                .replay(30, TimeUnit.SECONDS,Schedulers.io())
                .autoConnect();
    }

and I am calling the above code like this:

 service.getName()
        .subscribe(new Consumer<Name>()
            {
                @Override
                public void accept(Name name) throws Exception
                {
                    Log.d("getName", "Name: " + name.toString());
                }
            }
            , new Consumer<Throwable>()
            {
                @Override
                public void accept(Throwable throwable) throws Exception
                {
                    Log.d("getName", throwable.getMessage());
                }
            });

UPDATE: My apology if I didn't explain my question clearly. What I want is caching on a particular request instead of caching it on HttpClient level which applies the caching strategy to all the request being made through it. In the end I would like to define different caching expiration for different request when needed. Not all my request needs caching for small duration. I was wondering if I could do just that.

Appreciate your help in this.

2条回答
唯我独甜
2楼-- · 2019-07-25 01:46

Try to look at okhttp interceptors.

Add CacheInterceptor:

public class CacheInterceptor implements Interceptor {
    @Override
    public Response intercept(Chain chain) throws IOException {
        Response response = chain.proceed(chain.request());

        CacheControl cacheControl = new CacheControl.Builder()
                .maxAge(30, TimeUnit.SECONDS)
                .build();

        return response.newBuilder()
                .removeHeader("Pragma")
                .removeHeader("Cache-Control")
                .header("Cache-Control", cacheControl.toString())
                .build();
    }
}

And add it and cache to your OkHttp Client like this:

File httpCacheDirectory = new File(context.getCacheDir(), "http-cache");
int cacheSize = 10 * 1024 * 1024; // 10 MiB
Cache cache = new Cache(httpCacheDirectory, cacheSize);

OkHttpClient httpClient = new OkHttpClient.Builder()
                               .addNetworkInterceptor(new CacheInterceptor())
                               .cache(cache)
                               .build();
查看更多
再贱就再见
3楼-- · 2019-07-25 01:49

The are 2 problem with your approach:

  1. as @drhr mentioned, you are creating a new Observable each time you call service.getName() you're creating a new instance of Observable, you should keep the same replayed instance and give to the caller outside the same instance each time it calls service.getName().
  2. even if you will return the same instance, replay with 30 seconds, will replay the sequence emitted by the source Observable over the last 30 sec, meaning after cache expiration time, you will get nothing as your request happened more than 30 sec ago. it doesn't mean that the Observable will restart automatically after this period.

In order to cache for specific period, you basically need to invalidate the cached response after cache period, and perform new request after this period, that's mean you should control your subscribe, and do it there.
You can achieve it with something like that:

public class CachedRequest<T> {

    private final AtomicBoolean expired = new AtomicBoolean(true);
    private final Observable<T> source;
    private final long cacheExpirationInterval;
    private final TimeUnit cacheExpirationUnit;
    private Observable<T> current;

    public CachedRequest(Observable<T> o, long cacheExpirationInterval,
                         TimeUnit cacheExpirationUnit) {
        source = o;
        current = o;
        this.cacheExpirationInterval = cacheExpirationInterval;
        this.cacheExpirationUnit = cacheExpirationUnit;
    }

    private Observable<T> getCachedObservable() {
        return Observable.defer(() -> {
            if (expired.compareAndSet(true, false)) {
                current = source.cache();
                Observable.timer(cacheExpirationInterval, cacheExpirationUnit)                          
                        .subscribe(aLong -> expired.set(true));
            }
            return current;
        });
    }
}

with defer you can return the right Observable according to cache expiration status, so every subscribe happened within the cache expiration will get cached Observable (using cache()) - meaning request will be performed only once. after cache expiration, additional subscribe will trigger new request and will set a new timer to reset the cache expiration.

查看更多
登录 后发表回答