Cache the result of a Mono from a WebClient call i

2020-02-29 03:53发布

问题:

I am looking to cache a Mono (only if it is successful) which is the result of a WebClient call.

From reading the project reactor addons docs I don't feel that CacheMono is a good fit as it caches the errors as well which I do not want.

So instead of using CacheMono I am doing the below:

Cache<MyRequestObject, Mono<MyResponseObject>> myCaffeineCache = 
    Caffeine.newBuilder()
            .maximumSize(100)
            .expireAfterWrite(Duration.ofSeconds(60))
            .build();

MyRequestObject myRequestObject = ...;

Mono<MyResponseObject> myResponseObject = myCaffeineCache.get(myRequestObject,
    requestAsKey -> WebClient.create()
                             .post()
                             .uri("http://www.example.com")
                             .syncBody(requestAsKey)
                             .retrieve()
                             .bodyToMono(MyResponseObject.class)
                             .cache()
                             .doOnError(t -> myCaffeineCache.invalidate(requestAsKey)));

Here I am calling cache on the Mono and then adding it to the caffeine cache.

Any errors will enter doOnError to invalidate the cache.

Is this a valid approach to caching a Mono WebClient response?

回答1:

This is one of the very few use cases where you'd be actually allowed to call non-reactive libraries and wrap them with reactive types, and have processing done in side-effects operators like doOnXYZ, because:

  • Caffeine is an in-memory cache, so as far as I know there's no I/O involved
  • Caches often don't offer strong guarantees about caching values (it's very much "fire and forget)

You can then in this case query the cache to see if a cached version is there (wrap it and return right away), and cache a successful real response in a doOn operator, like this:

public class MyService {

    private WebClient client;

    private Cache<MyRequestObject, MyResponseObject> myCaffeineCache;

    public MyService() {
        this.client = WebClient.create();
        this.myCaffeineCache = Caffeine.newBuilder().maximumSize(100)
          .expireAfterWrite(Duration.ofSeconds(60)).build();
    }

    public Mono<MyResponseObject> fetchResponse(MyRequestObject request) {

        MyResponseObject cachedVersion = this.myCaffeineCache.get(myRequestObject);
        if (cachedVersion != null) {
           return Mono.just(cachedVersion);
        } else {
           return this.client.post()
                         .uri("http://www.example.com")
                         .syncBody(request.getKey())
                         .retrieve()
                         .bodyToMono(MyResponseObject.class)
                         .doOnNext(response -> this.myCaffeineCache.put(request.getKey(), response));
    }
}

Note that I wouldn't cache reactive types here, since there's no I/O involved nor backpressure once the value is returned by the cache. On the contrary, it's making things more difficult with subscription and other reactive streams constraints.

Also you're right about the cache operator since it isn't about caching the value per se, but more about replaying what happened to other subscribers. I believe that cache and replay operators are actually synonyms for Flux.



回答2:

Actually, you don't have to save errors with CacheMono.

private Cache<MyRequestObject, MyResponseObject> myCaffeineCache;

...

Mono<MyResponseObject> myResponseObject =
        CacheMono.lookup(key -> Mono.justOrEmpty(myCaffeineCache.getIfPresent(key))
                .map(Signal::next), myRequestObject)
                .onCacheMissResume(() -> /* Your web client or other Mono here */)
                .andWriteWith((key, signal) -> Mono.fromRunnable(() ->
                        Optional.ofNullable(signal.get())
                                .ifPresent(value -> myCaffeineCache.put(key, value))));

When you switch to external cache, this may be usefull. Don't forget using reactive clients for external caches.