Sequential composition for arbitrary number of cal

2019-07-16 07:39发布

问题:

We use Futures in vertx in examples like:

Future<JsonObject> fetchVehicle = getUserBookedVehicle(routingContext, client);

        fetchVehicle.compose(vehicleJson -> vehicleDoor(routingContext, client, vehicleJson, lock)).setHandler(
                asyncResult -> {
                    if (asyncResult.succeeded()) {
                    LOG.info("Door operation succeeded with result {}", asyncResult.result().encode());
                    handler.handle(Future.succeededFuture(new AsyncReply(200, "OK")));
                }
                else {
                    handler.handle(Future.failedFuture(asyncResult.cause()));
                }
        });

where we handle 2 calls for example.

OR I have another snippet where I can handle any number of methods:

List<Future> futures = new ArrayList<>();
        conversation.getRequestList().forEach(req -> {
            Future<Message<Object>> senderFuture = Future.future();
            vertx.eventBus().send(AbstractOEMClientVerticle.ADDRESS, JsonObject.mapFrom(req), deliveryOptions, senderFuture.completer());

            // sent successfully. save the replyAddress and the conversation for later/callback
            log.info("Saving the conversation for the request.", conversation.getReplyAddress());
            pendingCommands.put(req.getBody().getString(MSG_ID), conversation);

            futures.add(senderFuture);
        });

        CompositeFuture.all(futures).setHandler(ar -> {
            if (ar.succeeded()) {
                handler.handle(Future.succeededFuture());
            } else {
                log.error("forwardToVWClient VW got result : {}", ar.cause());
                handler.handle(Future.failedFuture(ar.cause()));
            }
        });

Here we are chaining all the requests in the conversation.getRequestList() without knowing their count in advance.

But the shortcoming of .all() method is that, we have no control on the order.

How can I chain any number of methods with Vertx Futures (without knowing the exact count of the calls) ?

EDIT:

The official guide talks about sequential composition but the example given has 3 calls. It does not explain how to do it for arbitrary number of calls.

See "Sequential composition" in http://vertx.io/docs/vertx-core/java/

I hope it is clear.

回答1:

If you want to feed the response from the previous request to the next request, and suppose you have different handlers for each response. You can add a helper method

private <T> Future<T> chain(Future<T> init, List<Function<T, Future<T>>> handlers) {
    Future<T> result = init;
    for (Function<T, Future<T>> handler : handlers) {
        result = result.compose(handler);
    }
    return result;
}

And then change your code like this

    Future<JsonObject> fetchVehicle = getUserBookedVehicle(routingContext, client);

    Function<JsonObject, Future<JsonObject>> vehicleResponseHandler = vehicleJson ->
        vehicleDoor(routingContext, client, vehicleJson, lock);

    Function<JsonObject, Future<JsonObject>> anotherTrivialHandler = someJsonObj -> {
        // add here new request by using information from someJsonObj
        LOG.info("Hello from trivial handler {} ", someJsonObj);
        return Future.succeededFuture(someJsonObj);
    };

    List<Function<JsonObject, Future<JsonObject>>> handlers = new ArrayList<>();

    handlers.add(vehicleResponseHandler);
    handlers.add(anotherTrivialHandler);

    chain(fetchVehicle, handlers).setHandler( asyncResult -> {
        if (asyncResult.succeeded()) {
            handler.handle(Future.succeededFuture(new AsyncReply(200, "OK")));
        } else {
            handler.handle(Future.failedFuture(asyncResult.cause()));
        }
    });

But there is a limitation for this implementation which requires each chained Future must have the same type parameter T.



回答2:

Here is a solution using map & reduce that executes a method in an orderly fashion and returns the accumulated result in the form of a Future<String>

 public static <T> Future<String> chainCall(List<T> list, Function<T, Future<String>> method){
        return list.stream().reduce(Future.succeededFuture(),// the initial "future"
                (acc, item) -> acc.compose(v -> method.apply(item)), // we return the compose of the previous "future" with "future" returned by next item processing
                (a,b) -> Future.future()); // not used! only useful for parallel stream.
    }

can be used as in the example below:

 chainCall(conversation.getRequestList(), this::sendApiRequestViaBus);

where sendApiRequestViaBus is:

/**
     * @param request The request to process
     * @return The result of the request processing. 
     */
    Future<String> sendApiRequestViaBus(ApiRequest request) {
        Future<String> future = Future.future();
        String address = CommandUtilsFactory.getInstance(request.getImplementation()).getApiClientAddress();
        log.debug("Chain call start msgId {}", request.getId());

        vertx.eventBus().send(address, JsonObject.mapFrom(request), deliveryOptions, res -> {
            log.debug("Chain call returns {}", request.getId());
            if (res.succeeded()) {
                future.complete("OK");
            } else {
                future.fail("KO");
            }
        });
        return future;
    }

I hope it helps.



回答3:

Here's something handy. Hope it helps.

public static <R> Future<List<R>> allOfFutures(List<Future<R>> futures) {
    return CompositeFutureImpl.all(futures.toArray(new Future[futures.size()]))
            .map(v -> futures.stream()
                    .map(Future::result)
                    .collect(Collectors.toList())
            );
}


标签: future vert.x