How to use CompletableFuture without risking a Sta

2019-07-24 15:25发布

问题:

I want to walk the search space of an asynchronous function. I coded the logic as follows:

/**
 * Assuming that a function maps a range of inputs to the same output value, minimizes the input value while
 * maintaining the output value.
 *
 * @param previousInput the last input known to return {@code target}
 * @param currentInput  the new input value to evaluate
 * @param function      maps an input to an output value
 * @param target        the expected output value
 * @return the minimum input value that results in the {@code target} output value
 * <br>{@code @throws NullPointerException} if any argument is null
 * <br>{@code @throws IllegalArgumentException} if {@code stepSize} is zero}
 */
private static CompletionStage<BigDecimal> optimizeInput(BigDecimal previousInput,
                                                         BigDecimal currentInput,
                                                         BigDecimal stepSize,
                                                         Function<BigDecimal, CompletionStage<BigDecimal>> function,
                                                         BigDecimal target)
{
    return function.apply(currentInput).thenCompose(output ->
    {
        assertThat("stepSize", stepSize).isNotZero();
        int outputMinusTarget = output.compareTo(target);
        if (outputMinusTarget != 0)
            return CompletableFuture.completedFuture(previousInput);
        BigDecimal nextInput = currentInput.add(stepSize);
        if (nextInput.compareTo(BigDecimal.ZERO) < 0)
            return CompletableFuture.completedFuture(previousInput);
        return optimizeInput(currentInput, nextInput, stepSize, function, target);
    });
}

Unfortunately, if the function has a large search space this raises a StackoverflowError after some iterations. Is it possible to walk the search space iteratively, with a fixed-size stack?

回答1:

you have the following recursion structure

CompletableFuture<T> compute(...) {
  return asyncTask().thenCompose(t -> {
    if (...)
      return completedFuture(t);
    } else {
      return compute(...);
    }
  }
}

You can rewrite it avoiding completable future composition and its stack usage during completion.

CompletableFuture<T> compute(...) {
  CompletableFuture<T> result = new CompletableFuture<>();
  computeHelper(result, ...);
  return result;
}   

void computeHelper(CompletableFuture<T> result, ...) {
  asyncTask().thenAccept(t -> {
    if (...) {
      result.complete(t);
    } else {
      computeHelper(result, ...);
    }
  });
}

if asyncTask() is not really asynchronous and just use the current thread, you must replace thenAccept with one of its asynchronous versions to use the executor task queue instead of the thread stack.



回答2:

dfogni's answer should work fine -- but for completeness, it is possible to avoid doing the executor handoffs in the case where the method is synchronous using a trampolining type technique.

To make it easier, I've introduced a class that capture the state that is changing between iterations and introducing methods that implement your completion checks and generate the next state. I believe this is the same as your original logic, but you can triple check.

private static CompletionStage<BigDecimal> optimizeInput(BigDecimal previousInput,
                                                          BigDecimal currentInput,
                                                          BigDecimal stepSize,
                                                          Function<BigDecimal, CompletionStage<BigDecimal>> function,
                                                          BigDecimal target) {
    class State {
        BigDecimal prev;
        BigDecimal curr;
        BigDecimal output;

        State(BigDecimal prev, BigDecimal curr, BigDecimal output) {
            this.prev = prev;
            this.curr = curr;
            this.output = output;
        }

        boolean shouldContinue() {
            return output.compareTo(target) == 0 && curr.add(stepSize).compareTo(BigDecimal.ZERO) >= 0;
        }

        CompletionStage<State> next() {
            BigDecimal nextInput = curr.add(stepSize);
            return function.apply(nextInput).thenApply(nextOutput -> new State(curr, nextInput, nextOutput));
        }
    }

    /* Now it gets complicated... we have to check if we're running on the same thread we were called on. If we
     * were, instead of recursively calling `next()`, we'll use PassBack to pass our new state back 
     * to the stack that called us.
     */
    class Passback {
        State state = null;
        boolean isRunning = true;

        State poll() {
            final State c = this.state;
            this.state = null;
            return c;
        }
    }
    class InputOptimizer extends CompletableFuture<BigDecimal> {
        void optimize(State state, final Thread previousThread, final Passback previousPassback) {
            final Thread currentThread = Thread.currentThread();

            if (currentThread.equals(previousThread) && previousPassback.isRunning) {
                // this is a recursive call, our caller will run it
                previousPassback.state = state;
            } else {
                Passback passback = new Passback();
                State curr = state;
                do {
                    if (curr.shouldContinue()) {
                        curr.next().thenAccept(next -> optimize(next, currentThread, passback));
                    } else {
                        complete(curr.prev);
                        return;
                    }
                // loop as long as we're making synchronous recursive calls
                } while ((curr = passback.poll()) != null);
                passback.isRunning = false;
            }
        }
    }

    InputOptimizer ret = new InputOptimizer();
    function.apply(currentInput)
            .thenAccept(output -> ret.optimize(
                    new State(previousInput, currentInput, output),
                    null, null));
    return ret;
}

Ok, so it's pretty complicated. Also, note that this requires your function will never throw an exception or complete exceptionally which could be problematic. You can generify this so you only have to write it once though (with correct exception handling), which can be found in the asyncutil library (Disclaimer: I am a co-author of this library). There might be other libraries with similar functionality, most likely a mature reactive library like Rx. Using asyncutil,

 private static CompletionStage<BigDecimal> optimizeInput(BigDecimal previousInput,
                                                          BigDecimal currentInput,
                                                          BigDecimal stepSize,
                                                          Function<BigDecimal, CompletionStage<BigDecimal>> function,
                                                          BigDecimal target) {
    // ... State class from before
    return function
            .apply(currentInput)
            .thenCompose(output -> AsyncTrampoline.asyncWhile(
                    State::shouldContinue, 
                    State::next, 
                    new State(previousInput, currentInput, output)))
            .thenApply(state -> state.prev);    
}