Cache handling with RXJava

2019-04-09 08:02发布

I'm trying to implement this workflow with rxJava but i'm sure if i'm misusing or doing stuff wrong.

  • User asks to login
  • If a loginResult is available in cache then "emit" the cached LoginResult
  • Else actually perform the request to the webservice and cache the result if everything is successfull
  • If an error occurs retry at most 3 times and if there is a 4th time then purge the cache.

Here is my full snippet of code.

public class LoginTask extends BaseBackground<LoginResult> {
  private static CachedLoginResult cachedLoginResult = new CachedLoginResult();
  private XMLRPCClient xmlrpcClient;
  private UserCredentialsHolder userCredentialsHolder;

  @Inject
  public LoginTask(XMLRPCClient client, UserCredentialsHolder userCredentialsHolder) {
    this.xmlrpcClient = client;
    this.userCredentialsHolder = userCredentialsHolder;
  }

  @Override
  public LoginResult performRequest() throws Exception {
    return UserApi.login(
        xmlrpcClient,
        userCredentialsHolder.getUserName(),
        userCredentialsHolder.getPlainPassword());


  }

  @Override
  public Observable<LoginResult> getObservable() {
    return cachedLoginResult.getObservable()
        .onErrorResumeNext(
            Observable.create(
                ((Observable.OnSubscribe<LoginResult>) subscriber -> {
                  try {
                    if (!subscriber.isUnsubscribed()) {
                      subscriber.onNext(performRequest()); // actually performRequest
                    }
                    subscriber.onCompleted();
                  } catch (Exception e) {
                    subscriber.onError(e);
                  }
                })
            )
                .doOnNext(cachedLoginResult::setLoginResult)
                .retry((attempts, t) -> attempts < 3)
                .doOnError(throwable -> cachedLoginResult.purgeCache())
        );
  }


  private static class CachedLoginResult {
    private LoginResult lr = null;
    private long when = 0;

    private CachedLoginResult() {
    }

    public boolean hasCache() {
      return lr != null && when + TimeUnit.MILLISECONDS.convert(30, TimeUnit.MINUTES) > System.currentTimeMillis();
    }

    public void setLoginResult(LoginResult lr) {
      if (lr != null) {
          this.lr = lr;
          this.when = System.currentTimeMillis();
      }
    }

    public void purgeCache() {
      this.lr = null;
      this.when = 0;
    }

    public Observable<LoginResult> getObservable() {
      return Observable.create(new Observable.OnSubscribe<LoginResult>() {
        @Override
        public void call(Subscriber<? super LoginResult> subscriber) {
          if (!subscriber.isUnsubscribed()) {
            if (hasCache()) {
              subscriber.onNext(lr);
              subscriber.onCompleted();
            } else {
              subscriber.onError(new RuntimeException("No cache"));
            }
          }
        }
      });
    }
  }
}

Since i wan't able to find any similar examples and i started "playing" with rxjava just 1 day ago i'm unsure of my implementation.

Thank you for your time.

2条回答
聊天终结者
2楼-- · 2019-04-09 08:27

If I understand correctly, you want to perform the login once and cache the result in a reactive manner? If so, here is an example how I would do this:

import java.util.concurrent.ThreadLocalRandom;

import rx.*;
import rx.schedulers.Schedulers;
import rx.subjects.AsyncSubject;


public class CachingLogin {
    static class LoginResult {

    }
    /** Guarded by this. */
    AsyncSubject<LoginResult> cache;
    public Observable<LoginResult> login(String username, String password) {
        AsyncSubject<LoginResult> c;
        boolean doLogin = false;
        synchronized (this) {
            if (cache == null || cache.hasThrowable()) {
                cache = AsyncSubject.create();
                doLogin = true;
            }
            c = cache;
        }
        if (doLogin) {
            Observable.just(1).subscribeOn(Schedulers.io())
            .map(v -> loginAPI(username, password))
            .retry(3).subscribe(c);
        }
        return c;
    }
    public void purgeCache() {
        synchronized (this) {
            cache = null;
        }
    }
    static LoginResult loginAPI(String username, String password) {
        if (ThreadLocalRandom.current().nextDouble() < 0.3) {
            throw new RuntimeException("Failed");
        }
        return new LoginResult();
    }
}
查看更多
戒情不戒烟
3楼-- · 2019-04-09 08:39

I think this code is alright, good job :)

You were right to use Observable.create in your LoginTask because otherwise result of the call could be cached internally, and then retry wouldn't help much...

This is I think however unnecessary for the CachedLoginResult's Observable. Here you can simplify your code by using Observable.justand Observable.error utility methods, something like:

public Observable<LoginResult> getObservable() {
  if (hasCache()) {
      return Observable.just(lr);
  } else {
      return Observable.error(new RuntimeException("No cache"));
  }
}

Note: just stores the value you tell it to emit internally, so that resubscriptions will always produce this value. This is what I hinted above, you shouldn't do Observable.just(performRequest()).retry(3) for example, because the performRequest will only ever be called once.

查看更多
登录 后发表回答