How to divide 1 completablefuture to many completa

2019-05-06 17:05发布

问题:

For example I have such methods:

public CompletableFuture<Page> getPage(int i) {
    ...
}
public CompletableFuture<Document> getDocument(int i) {
    ...
}
public CompletableFuture<Void> parseLinks(Document doc) {
    ...
}

And my flow:

List<CompletableFuture> list = IntStream
    .range(0, 10)
    .mapToObj(i -> getPage(i))

    // I want method like this:
    .thenApplyAndSplit(CompletableFuture<Page> page -> {
        List<CompletableFuture<Document>> docs = page.getDocsId()
            .stream()
            .map(i -> getDocument(i))
            .collect(Collectors.toList());
        return docs;
    })
    .map(CompletableFuture<Document> future -> {
        return future.thenApply(Document doc -> parseLink(doc);
    })
    .collect(Collectors.toList());

It should by something like flatMap() for CompletableFuture, so I want to implement this flow:

List<Integer> -> Stream<CompletableFuture<Page>>
              -> Stream<CompletableFuture<Document>>
              -> parse each

UPDATE

Stream<CompletableFuture<Page>> pagesCFS = IntStream
        .range(0, 10)
        .mapToObj(i -> getPage(i));

Stream<CompletableFuture<Document>> documentCFS = listCFS.flatMap(page -> {
    // How to return stream of Document when page finishes?
    // page.thenApply( ... )
})

回答1:

I also wanted to give a shot at implementing a Spliterator for streams of CompletableFutures, so here is my attempt.

Note that, if you are using this in parallel mode, pay attention to use a different ForkJoinPool for the stream and the tasks that are running behind the CompletableFuture's. The stream will wait for the futures to complete, so you could actually loose performance if they share the same executor, or even run into deadlocks.

So here is the implementation:

public static <T> Stream<T> flattenStreamOfFutures(Stream<CompletableFuture<? extends T>> stream, boolean parallel) {
    return StreamSupport.stream(new CompletableFutureSpliterator<T>(stream), parallel);
}

public static <T> Stream<T> flattenStreamOfFuturesOfStream(Stream<CompletableFuture<? extends Stream<T>>> stream,
                                                           boolean parallel) {
    return flattenStreamOfFutures(stream, parallel).flatMap(Function.identity());
}

public static class CompletableFutureSpliterator<T> implements Spliterator<T> {
    private List<CompletableFuture<? extends T>> futures;

    CompletableFutureSpliterator(Stream<CompletableFuture<? extends T>> stream) {
        futures = stream.collect(Collectors.toList());
    }

    CompletableFutureSpliterator(CompletableFuture<T>[] futures) {
        this.futures = new ArrayList<>(Arrays.asList(futures));
    }

    CompletableFutureSpliterator(final List<CompletableFuture<? extends T>> futures) {
        this.futures = new ArrayList<>(futures);
    }

    @Override
    public boolean tryAdvance(final Consumer<? super T> action) {
        if (futures.isEmpty())
            return false;
        CompletableFuture.anyOf(futures.stream().toArray(CompletableFuture[]::new)).join();
        // now at least one of the futures has finished, get its value and remove it
        ListIterator<CompletableFuture<? extends T>> it = futures.listIterator(futures.size());
        while (it.hasPrevious()) {
            final CompletableFuture<? extends T> future = it.previous();
            if (future.isDone()) {
                it.remove();
                action.accept(future.join());
                return true;
            }
        }
        throw new IllegalStateException("Should not reach here");
    }

    @Override
    public Spliterator<T> trySplit() {
        if (futures.size() > 1) {
            int middle = futures.size() >>> 1;
            // relies on the constructor copying the list, as it gets modified in place
            Spliterator<T> result = new CompletableFutureSpliterator<>(futures.subList(0, middle));
            futures = futures.subList(middle, futures.size());
            return result;
        }
        return null;
    }

    @Override
    public long estimateSize() {
        return futures.size();
    }

    @Override
    public int characteristics() {
        return IMMUTABLE | SIZED | SUBSIZED;
    }
}

It works by transforming the given Stream<CompletableFuture<T>> into a List of those futures — it is assumed that building the stream is fast, the hard work being done by the futures themselves, so making a list out of it shouldn't be costly. This also makes sure that all tasks are already triggered, as it forces to process the source stream.

For generating the output stream, it simply waits for any future to complete before streaming its value.

A simple non-parallel usage example (the executor is used for the CompletableFutures, in order to start them all at the same time):

ExecutorService executor = Executors.newFixedThreadPool(20);
long start = System.currentTimeMillis();
flattenStreamOfFutures(IntStream.range(0, 20)
        .mapToObj(i -> CompletableFuture.supplyAsync(() -> {
            try {
                Thread.sleep((i % 10) * 1000);
            } catch (InterruptedException e) {
                Thread.currentThread().interrupt();
                throw new RuntimeException(e);
            }
            System.out.println("Finished " + i + " @ " + (System.currentTimeMillis() - start) + "ms");
            return i;
        }, executor)), false)
        .forEach(x -> {
            System.out.println(Thread.currentThread().getName() + " @ " + (System.currentTimeMillis() - start) + "ms handle result: " + x);
        });
executor.shutdown();

Output:

Finished 10 @ 103ms
Finished 0 @ 105ms
main @ 114ms handle result: 10
main @ 114ms handle result: 0
Finished 1 @ 1102ms
main @ 1102ms handle result: 1
Finished 11 @ 1104ms
main @ 1104ms handle result: 11
Finished 2 @ 2102ms
main @ 2102ms handle result: 2
Finished 12 @ 2104ms
main @ 2105ms handle result: 12
Finished 3 @ 3102ms
main @ 3102ms handle result: 3
Finished 13 @ 3104ms
main @ 3105ms handle result: 13
…

As you can see, the stream produces the values almost instantly, even though the futures do not complete in order.

Applying it to the example in the question, this would give (assuming parseLinks() returns a CompletableFuture<String> instead of ~<Void>):

flattenStreamOfFuturesOfStream(IntStream.range(0, 10)
                .mapToObj(this::getPage)
                // the next map() will give a Stream<CompletableFuture<Stream<String>>>
                // hence the need for flattenStreamOfFuturesOfStream()
                .map(pcf -> pcf
                        .thenApply(page -> flattenStreamOfFutures(page
                                        .getDocsId()
                                        .stream()
                                        .map(this::getDocument)
                                        .map(docCF -> docCF.thenCompose(this::parseLinks)),
                                false))),
        false)
.forEach(System.out::println);


回答2:

Do you really have to use Streams? can't you just put some dependent actions to your CompletableFutures? Especially since your last call returns CompletableFutures<Void> (Of course, it would also be possible to use Collection.forEach)

List<CompletableFuture<Page>> completableFutures = IntStream
      .range(0, 10)
      .mapToObj(i -> getPage(i)).collect(Collectors.toList());

for (CompletableFuture<Page> page : completableFutures) {
    page.thenAccept(p -> {
        List<Integer> docsId = p.getDocsId();
        for (Integer integer : docsId) {
            getDocument(integer).thenAccept(d-> parseLinks(d));
        }
    });
}

EDIT: Well so I made another attempt, but I am not sure if this is a good idea, since I am not an expert on CompletableFuture.

Using the following method (maybe there could be a better implementation):

public static <T> CompletableFuture<Stream<T>> flatMapCF(Stream<CompletableFuture<T>> stream){
    return CompletableFuture.supplyAsync( ()->
        stream.map(CompletableFuture::join)
    );
}


Stream<CompletableFuture<Page>> pagesCFS = IntStream
        .range(0, 10)
        .mapToObj(i -> getPage(i));

CompletableFuture<Stream<Page>> pageCF = flatMapCF(pagesCFS);

CompletableFuture<Stream<Document>> docCF= 
   pageCF.thenCompose(a ->
        flatMapCF(a.flatMap(
                b -> b.getDocsId()
                        .stream()
                        .map(c -> getDocument(c))
        )));

The problem is probably, that that CompletableFuture returns only when all results are available



回答3:

If you do not care about when the operation finishes, then the following will simply trigger the parseLinks() on all documents:

IntStream.range(0, 10)
        .mapToObj(this::getPage)
        .forEach(pcf -> pcf
                .thenAccept(page -> page
                        .getDocsId()
                        .stream()
                        .map(this::getDocument)
                        .forEach(docCF -> docCF.thenCompose(this::parseLinks))));

Otherwise, as your last operation returns a CompletableFuture<Void>, I assume you would be mainly interested to know when everything is completed. You could do something like this:

CompletableFuture<Void> result = CompletableFuture.allOf(IntStream.range(0, 10)
        .mapToObj(this::getPage)
        .map(pcf -> pcf
                .thenCompose(page -> CompletableFuture.allOf(page
                        .getDocsId()
                        .stream()
                        .map(docId -> getDocument(docId)
                                .thenCompose(this::parseLinks))
                        .toArray(CompletableFuture[]::new))))
        .toArray(CompletableFuture[]::new));

If you are interested in the result of individual CompletableFutures, the best is probably to process them directly within the stream, at the place they are created.

You could even wrap this all in a reusable method. For example, if parseLinks() was returning a CompletableFuture<List<String>>, you could define a method like this:

public CompletableFuture<Void> processLinks(Function<? super CompletableFuture<List<String>>, ? extends CompletableFuture<?>> processor) {
    return CompletableFuture.allOf(IntStream.range(0, 10)
            .mapToObj(this::getPage)
            .map(pcf -> pcf
                    .thenCompose(page -> CompletableFuture.allOf(page
                            .getDocsId()
                            .stream()
                            .map(docId -> getDocument(docId)
                                    .thenCompose(this::parseLinks))
                            .map(processor) // here we apply the received function
                            .toArray(CompletableFuture[]::new))))
            .toArray(CompletableFuture[]::new));
}

and process the resulting lists like this:

processLinks(linksCF -> linksCF
        .thenAccept(links -> links.forEach(System.out::println)));

The returned CompletableFuture would complete once all links have been printed.