Execute request or subscribe to the result with Rx

2019-09-02 18:45发布

问题:

I am trying to implement the following logic with RxJava:

  1. Execute server request if I don't have value locally
  2. Provide result to the subscriber
  3. If the request is running do not create a second one, but subscribe to the result of the running request.

The following solution partially solves the problem:

private final ExecutorService executor = Executors.newSingleThreadExecutor();

private  Observable<T> getValue(){
    if(storage.getValue() == null) {
        Future<T> futureValue = executor.submit(new Callable<T>() {
            @Override
            public T call() throws Exception {
                return getValueFromStorageOrBackend();
            }
        });
        return Observable.from(futureValue);
    } else {
        return Observable.just(storage.getValue());
    }
}

private String getValueFromStorageOrBackend() {
    final String currentValue = storage.getValue();
    if (currentValue == null) {
        Response response = backend.requestValue();
        storage.setValue(response.getValue());

        return response.getValue();
    }

    return currentValue;
}

Is there an elegant pure RxJava solution for this?

回答1:

One of the simpler ways to do this is via AsyncSubject + a compare-and-set loop:

final AtomicReference<AsyncSubject<T>> subjectRef = new AtomicReference<>();

public Observable<T> get() {
    for (;;) {
        AsyncSubject<T> subject = subjectRef.get();
        if (subject != null) {
            return subject;
        }
        AsyncSubject<T> next = AsyncSubject.create();
        if (subjectRef.compareAndSet(null, next)) {
            Observable.just(1).map(ignored -> {
                // your computation here
                return 2; // the result
            }).subscribeOn(Schedulers.io()).subscribe(subject);
            return;
        }
    }
}

I almost forgot, an even more simple way is to use cache:

Observable
.just(1)
.subscribeOn(Schedulers.io())
.map(ignored -> {
    // computation here
    return 2;
}).cache();


标签: java rx-java