I'm trying to implement this workflow with rxJava but i'm sure if i'm misusing or doing stuff wrong.
- User asks to login
- If a loginResult is available in cache then "emit" the cached LoginResult
- Else actually perform the request to the webservice and cache the result if everything is successfull
- If an error occurs retry at most 3 times and if there is a 4th time then purge the cache.
Here is my full snippet of code.
public class LoginTask extends BaseBackground<LoginResult> {
private static CachedLoginResult cachedLoginResult = new CachedLoginResult();
private XMLRPCClient xmlrpcClient;
private UserCredentialsHolder userCredentialsHolder;
@Inject
public LoginTask(XMLRPCClient client, UserCredentialsHolder userCredentialsHolder) {
this.xmlrpcClient = client;
this.userCredentialsHolder = userCredentialsHolder;
}
@Override
public LoginResult performRequest() throws Exception {
return UserApi.login(
xmlrpcClient,
userCredentialsHolder.getUserName(),
userCredentialsHolder.getPlainPassword());
}
@Override
public Observable<LoginResult> getObservable() {
return cachedLoginResult.getObservable()
.onErrorResumeNext(
Observable.create(
((Observable.OnSubscribe<LoginResult>) subscriber -> {
try {
if (!subscriber.isUnsubscribed()) {
subscriber.onNext(performRequest()); // actually performRequest
}
subscriber.onCompleted();
} catch (Exception e) {
subscriber.onError(e);
}
})
)
.doOnNext(cachedLoginResult::setLoginResult)
.retry((attempts, t) -> attempts < 3)
.doOnError(throwable -> cachedLoginResult.purgeCache())
);
}
private static class CachedLoginResult {
private LoginResult lr = null;
private long when = 0;
private CachedLoginResult() {
}
public boolean hasCache() {
return lr != null && when + TimeUnit.MILLISECONDS.convert(30, TimeUnit.MINUTES) > System.currentTimeMillis();
}
public void setLoginResult(LoginResult lr) {
if (lr != null) {
this.lr = lr;
this.when = System.currentTimeMillis();
}
}
public void purgeCache() {
this.lr = null;
this.when = 0;
}
public Observable<LoginResult> getObservable() {
return Observable.create(new Observable.OnSubscribe<LoginResult>() {
@Override
public void call(Subscriber<? super LoginResult> subscriber) {
if (!subscriber.isUnsubscribed()) {
if (hasCache()) {
subscriber.onNext(lr);
subscriber.onCompleted();
} else {
subscriber.onError(new RuntimeException("No cache"));
}
}
}
});
}
}
}
Since i wan't able to find any similar examples and i started "playing" with rxjava just 1 day ago i'm unsure of my implementation.
Thank you for your time.
If I understand correctly, you want to perform the login once and cache the result in a reactive manner? If so, here is an example how I would do this:
I think this code is alright, good job :)
You were right to use
Observable.create
in yourLoginTask
because otherwise result of the call could be cached internally, and thenretry
wouldn't help much...This is I think however unnecessary for the
CachedLoginResult
'sObservable
. Here you can simplify your code by usingObservable.just
andObservable.error
utility methods, something like:Note:
just
stores the value you tell it to emit internally, so that resubscriptions will always produce this value. This is what I hinted above, you shouldn't doObservable.just(performRequest()).retry(3)
for example, because theperformRequest
will only ever be called once.