Use the results of two Guava ListenableFutures of

2019-06-15 11:05发布

问题:

I have two ListenableFutures which are completed on other threads. Each future is of a different type, and I wish to use both of their results when they are both complete.

Is there an elegant way to handle this using Guava?

回答1:

Runnable listener = new Runnable() {
    private boolean jobDone = false;

    @Override
    public synchronized void run() {
        if (jobDone || !(future1.isDone() && future2.isDone())) {
            return;
        }
        jobDone = true;
        // TODO do your job
    }
};
future1.addListener(listener);
future2.addListener(listener);

Not really elegant, but should do the job.

Or, more elegant, but you'll need casts:

ListenableFuture<List<Object>> composedFuture = 
    Futures.allAsList(future1, future2);


回答2:

If you want some sort of type safety you can do the following:

class Composite {
  public A a;
  public B b;
}

public ListenableFuture<Composite> combine(ListenableFuture<A> futureA, 
                                           final ListenableFuture<B> futureB) {

  return Futures.transform(futureA, new AsyncFunction<A, Composite>() {
    public ListenableFuture<Composite> apply(final A a) throws Exception {
      return Futures.transform(futureB, new Function<B, Compisite>() {
        public Composite apply(B b) {
          return new Composite(a, b);
        }
      }
    } 
  }

}

ListenableFuture<A> futureA = ...
ListenableFuture<B> futureB = ...

ListenableFuture<Composite> result = combine(futureA, futureB);

In this case Composite can be a Pair<A, B> from Apache Commons if you like.

Also, a failure in either future will result in a failure in the resulting combined future.

Another solution would be to take a look at Trickle from the team at Spotify. The GitHub README has an example which shows a solution to a similar problem.

There are undoubtedly other solutions but this is the one that popped into my head.



回答3:

Since Guava v20.0 you can use:

ListenableFuture<CombinedResult> resultFuture =
   Futures.whenAllSucceed(future1, future2)
       .call(callableThatCombinesAndReturnsCombinedResult, executor);

Look at java docs example here



回答4:

If you want some type safety, you can combine result of 2 different independent tasks by using EventBus from the sister Guava com.google.common.eventbus package

For the example sake, let's assume that one of you Futures returns Integer and the other Double.

First, create an accumulator (other names builder, collector, etc) class that you will register as an event sink with EventBus. As you can see it's really a POJO which will hanlde Integer and Double events

class Accumulator
{
    Integer intResult;
    Double  doubleResult;

    @Subscribe // This annotation makes it an event handler
    public void setIntResult ( final Integer val )
    {
        intResult = val;
    }

    @Subscribe
    public void setDoubleResult ( final Double val )
    {
        doubleResult = val;
    }
}

Here is the implementation of the method that will take 2 futures and will combine them into an accumulator.

final ListenableFuture< Integer > future1 = ...;
final ListenableFuture< Double > future2 = ...;

final ImmutableList< ListenableFuture< ? extends Object> > futures =
    ImmutableList.< ListenableFuture<? extends Object> >of( future1, future2 );

final ListenableFuture< Accumulator > resultFuture =
    Futures.transform(
        // If you don't care about failures, use allAsList
        Futures.successfulAsList( futures ),
        new Function< List<Object>, Accumulator > ( )
        {
            @Override
            public Accumulator apply ( final List< Object > input )
            {
                final Accumulator accumulator = new Accumulator( );

                final EventBus eventBus = new EventBus( );
                eventBus.register( accumulator );

                for ( final Object cur: input )
                {
                    // Failed results will be set to null
                    if ( cur != null )
                    {
                        eventBus.post( cur );
                    }
                }

                return accumulator;
            }
        }
    );

final Accumulator accumulator = resultFuture.get( );


回答5:

Here is a simple example that would perform the addition of 2 listenable futures:

//Asynchronous call to get first value
final ListenableFuture<Integer> futureValue1 = ...;

//Take the result of futureValue1 and transform it into a function to get the second value
final AsyncFunction<Integer, Integer> getSecondValueAndSumFunction = new AsyncFunction<Integer, Integer>() {
    @Override
    public ListenableFuture<Integer> apply(final Integer value1) {

        //Asynchronous call to get second value
        final ListenableFuture<Integer> futureValue2 = ...;


        //Return the sum of the values
        final Function<Integer, Integer> addValuesFuture = new Function<Integer, Integer>() {
            @Override
            public Integer apply(Integer value2) {

                Integer sum = value1 + value2;
                return sum;
            }               
        };              

        //Transform the second value so its value can be added to the first
        final ListenableFuture<Integer> sumFuture = Futures.transform(futureValue2, addValuesFuture);   

        return sumFuture;
    }
};

final ListenableFuture<Integer> valueOnePlusValueTwo = Futures.transform(futureValue1, getSecondValueAndSumFunction);