Can a ListenableFuture chain handle inner Executio

2019-07-19 13:48发布

问题:

I am provided with an api (fnGrpc) that performs a gRPC call and returns a ListenableFuture that resolves to some value v (the implementation of which is fixed and unmodifiable).

I want to provide a helper function (fnHelper) that:

  1. does some transformational processing on the gRPC result and itself returns a ListenableFuture that resolves to the transformed value t1.

  2. handles failure of the gRPC call, and returns some other value t2 instead of having fnHelper's caller see an ExecutionException.

I can solve (1) by using Futures.transform():

package myHelper;

ListenableFuture<T> fnHelper() {
  return Futures.transform(fnGrpc(), new Function<V, T>() {
    @Override
    public T apply(V v) {
      T t1 = f(v);
      return t1;
    }
  });
} 

and the caller:

package myApp;

// ...

try {
  T t = fnHelper().get();
} catch (ExecutionException | InterruptedException ex) {
  // ...
}

How can I achieve (2) whilst still having fnHelper return a ListenableFuture and remain non-blocking?

I could have fnHelper itself create an additional thread within which I would call .get() on fnGrpc, but is there another way that avoids this additional thread?

回答1:

I'm not an expert in Guava, but it seems you can do that using the same Futures utility class, in particular the method catchingAsync, where you can pass a function that returns a ListenableFuture with the fallback value (t2):

ListenableFuture<Integer> faultTolerantFuture = Futures.catchingAsync(originalFuture,
                       Exception.class, x -> immediateFuture(t2), executor);

You should then be able to chain this with the transform method, which does the transformation:

ListenableFuture<T> fnHelper() {
    return Futures.catching(Futures.transform(fnGrpc(), new Function<V, T>() {
        @Override
        public T apply(V v) {
            T t1 = f(v);
            return t1;
        }
    }),
    Exception.class, x -> immediateFuture(t2));
}

Note: In the last snippet, I used catching instead of catchingAsync to be consistent with the code in your question, and I didn't specify an executor. You probably need to use the methods with the Async suffix for non-blocking processing.