Execute multiple queries in parallel via Streams

2019-02-03 18:32发布

问题:

I am having the following method:

public String getResult() {

        List<String> serversList = getServerListFromDB();

        List<String> appList = getAppListFromDB();

        List<String> userList = getUserFromDB();

        return getResult(serversList, appList, userList);
    }

Here I am calling three method sequentially which in turns hits the DB and fetch me results, then I do post processing on the results I got from the DB hits. I know how to call these three methods concurrently via use of Threads. But I would like to use Java 8 Parallel Stream to achieve this. Can someone please guide me how to achieve the same via Parallel Streams?

EDIT I just want to call the methods in parallel via Stream.

private void getInformation() {
    method1();
    method2();
    method3();
    method4();
    method5();
}

回答1:

You may utilize CompletableFuture this way:

public String getResult() {

    // Create Stream of tasks:
    Stream<Supplier<List<String>>> tasks = Stream.of(
            () -> getServerListFromDB(),
            () -> getAppListFromDB(),
            () -> getUserFromDB());

    List<List<String>> lists = tasks
         // Supply all the tasks for execution and collect CompletableFutures
         .map(CompletableFuture::supplyAsync).collect(Collectors.toList())
         // Join all the CompletableFutures to gather the results
         .stream()
         .map(CompletableFuture::join).collect(Collectors.toList());

    // Use the results. They are guaranteed to be ordered in the same way as the tasks
    return getResult(lists.get(0), lists.get(1), lists.get(2));
}


回答2:

foreach is what used for side-effects, you can call foreach on a parallel stream. ex:

listOfTasks.parallelStream().foreach(list->{
  submitToDb(list);
});

However, parallelStream uses the common ForkJoinPool which is arguably not good for IO-bound tasks.

Consider using a CompletableFuture and supply an appropriate ExecutorService. It gives more flexibility (continuation,configuration). For ex:

ExecutorService executorService = Executors.newCachedThreadPool();
List<CompletableFuture> allFutures = new ArrayList<>();
for(Query query:queries){
 CompletableFuture<String> query = CompletableFuture.supplyAsync(() -> {
        // submit query to db
        return result;
  }, executorService);
  allFutures.add(query);
}

 CompletableFuture<Void> all = CompletableFuture.allOf(allFutures.toArray(new CompletableFuture[allFutures.size()]));


回答3:

As already mentioned, a standard parallel stream is probably not the best fit for your use case. I would complete each task asynchronously using an ExecutorService and "join" them when calling the getResult method:

ExecutorService es = Executors.newFixedThreadPool(3);

Future<List<String>> serversList = es.submit(() -> getServerListFromDB());
Future<List<String>> appList = es.submit(() -> getAppListFromDB());
Future<List<String>> userList = es.submit(() -> getUserFromDB());

return getResult(serversList.get(), appList.get(), userList.get());


回答4:

Not quite clear what do you mean, but if you just want to run some process on these lists on parallel you can do something like this:

    List<String> list1 = Arrays.asList("1", "234", "33");

    List<String> list2 = Arrays.asList("a", "b", "cddd");

    List<String> list3 = Arrays.asList("1331", "22", "33");

    List<List<String>> listOfList = Arrays.asList(list1, list2, list3);

    listOfList.parallelStream().forEach(list -> System.out.println(list.stream().max((o1, o2) -> Integer.compare(o1.length(), o2.length()))));

(it will print most lengthy elements from each list).