Java 8 parallelStream findFirst

2019-06-22 09:07发布

Suppose we have a list of workers like this :

List<Worker> workers = new ArrayList<>();
workers.add(new Worker(1));
workers.add(new Worker(2));
workers.add(new Worker(3));
workers.add(new Worker(4));
workers.add(new Worker(5));

I want to find the first worker who finished his job, so :

Worker first = workers.parallelStream().filter(Worker::finish).findFirst().orElse(null);

but there's a problem, I don't want to wait for all workers to finish their jobs and then find the first, but the first worker As soon as he finished his job!

public class Test {

    public static void main(String[] args) {
        List<Worker> workers = new ArrayList<>();
        workers.add(new Worker(1));
        workers.add(new Worker(2));
        workers.add(new Worker(3));
        workers.add(new Worker(4));
        workers.add(new Worker(5));
        Worker first = workers.parallelStream().filter(Worker::finish).findFirst().orElse(null);
        if (first != null) {
            System.out.println("id : " + first.id);
        }
    }

    static class Worker {

        int id;

        Worker(int id) {
            this.id = id;
        }

        boolean finish() {
            int t = id * 1000;
            System.out.println(id + " -> " + t);
            try {
                Thread.sleep(t);
            } catch (InterruptedException ignored) {
            }
            return true;
        }

    }

}

is there any way to achieve it using java.util.Stream?

Thanks.

4条回答
走好不送
2楼-- · 2019-06-22 09:31

When you use your finish method as the filter of the Stream, it means that in order to evaluate the filter's predicate for a specific Worker, the Worker has to finish its work.

When you run this code as a parallel Stream, however, it's possible that the filter would be applied on multiple Workers at the same time, in which case, the first one to finish would give you the output. However, you have no control over how many threads the parallel Stream will use. It may decide that some of the Workers should be processed on the same thread, in which case some of them won't be processed at all (since your terminal operation requires that only one Worker finishes its processing).

Therefore, if your goal is that finish is executed for all Workers at the same time, you can't use a Stream (not even a parallel Stream).

查看更多
啃猪蹄的小仙女
3楼-- · 2019-06-22 09:40

I know it is old question but found some nice solution here: https://winterbe.com/posts/2015/04/07/java8-concurrency-tutorial-thread-executor-examples/

Under InvokeAny:

Another way of batch-submitting callables is the method invokeAny() which works slightly different to invokeAll(). Instead of returning future objects this method blocks until the first callable terminates and returns the result of that callable.

Change stream to a collection of callables. It looks really clean.

查看更多
看我几分像从前
4楼-- · 2019-06-22 09:46

Instead of using Stream you can try to use Observable from Reactive Extensions for Java (RxJava). The example code below.

public class Example {
  public static void main(String[] args) {
    Maybe<Worker> workerResult = Observable.fromArray(Worker.run(1), Worker.run(2), Worker.run(3), Worker.run(4), Worker.run(5))
        .flatMap(worker -> (Observable<Worker>) worker)
        .firstElement();
    workerResult.subscribe(onNext -> System.out.println("First worker [" + onNext.toString() + "]"));

    try {
      Thread.sleep(5000);
    } catch (InterruptedException e) {
      e.printStackTrace();
    }
  }
}

class Worker {
  private int id;
  static Observable run(int id) { return Observable.just(new Worker(id)).observeOn(Schedulers.computation()).doOnNext(Worker::process); }
  private Worker(int id) { this.id = id; }
  public void process() {
    try {
      Thread.sleep(new Random().nextInt(2000));
    } catch (InterruptedException e) {
      System.out.println(String.format("[%s] Thread interrupted [%s]", Thread.currentThread(), id));
    }
    System.out.println(String.format("[%s] Worker [%s]", Thread.currentThread(), id));
  }
    public String toString() { return "Worker [" + id + "]"; }
}

The example output:

[Thread[RxComputationThreadPool-2,5,main]] Worker [2]
[Thread[RxComputationThreadPool-1,5,main]] Thread interrupted [1]
[Thread[RxComputationThreadPool-1,5,main]] Worker [1]
[Thread[RxComputationThreadPool-4,5,main]] Thread interrupted [4]
[Thread[RxComputationThreadPool-3,5,main]] Thread interrupted [3]
[Thread[RxComputationThreadPool-3,5,main]] Worker [3]
[Thread[RxComputationThreadPool-4,5,main]] Worker [4]
First worker [Worker [2]]
查看更多
太酷不给撩
5楼-- · 2019-06-22 09:49

You seem to have a serious misconception about Stream. Streams are not meant to launch workers. In fact, if you use findFirst it may happen that it starts no worker but the first one. And so it also doesn’t wait “for all workers to finish” but only for currently pending threads. But since you have a rather small stream it might be the case that all workers have been started already because there are as much threads available in your environment. But this is not a guaranteed behavior.

Note that if you use a sequential stream instead of a parallel stream it will for sure process the first item only (as it returns true) and none of the other. But since the stream implementation can’t predict that result it will respect you request to “accelerate” the operation via parallel execution and may start processing more items in advance using more threads.

查看更多
登录 后发表回答