Suppose we have a list of workers like this :
List<Worker> workers = new ArrayList<>();
workers.add(new Worker(1));
workers.add(new Worker(2));
workers.add(new Worker(3));
workers.add(new Worker(4));
workers.add(new Worker(5));
I want to find the first worker who finished his job, so :
Worker first = workers.parallelStream().filter(Worker::finish).findFirst().orElse(null);
but there's a problem, I don't want to wait for all workers to finish their jobs and then find the first, but the first worker As soon as he finished his job!
public class Test {
public static void main(String[] args) {
List<Worker> workers = new ArrayList<>();
workers.add(new Worker(1));
workers.add(new Worker(2));
workers.add(new Worker(3));
workers.add(new Worker(4));
workers.add(new Worker(5));
Worker first = workers.parallelStream().filter(Worker::finish).findFirst().orElse(null);
if (first != null) {
System.out.println("id : " + first.id);
}
}
static class Worker {
int id;
Worker(int id) {
this.id = id;
}
boolean finish() {
int t = id * 1000;
System.out.println(id + " -> " + t);
try {
Thread.sleep(t);
} catch (InterruptedException ignored) {
}
return true;
}
}
}
is there any way to achieve it using java.util.Stream
?
Thanks.
When you use your
finish
method as the filter of the Stream, it means that in order to evaluate the filter's predicate for a specific Worker, the Worker has to finish its work.When you run this code as a parallel Stream, however, it's possible that the filter would be applied on multiple Workers at the same time, in which case, the first one to finish would give you the output. However, you have no control over how many threads the parallel Stream will use. It may decide that some of the Workers should be processed on the same thread, in which case some of them won't be processed at all (since your terminal operation requires that only one Worker finishes its processing).
Therefore, if your goal is that
finish
is executed for all Workers at the same time, you can't use a Stream (not even a parallel Stream).I know it is old question but found some nice solution here: https://winterbe.com/posts/2015/04/07/java8-concurrency-tutorial-thread-executor-examples/
Under InvokeAny:
Change stream to a collection of callables. It looks really clean.
Instead of using
Stream
you can try to useObservable
from Reactive Extensions for Java (RxJava
). The example code below.The example output:
You seem to have a serious misconception about
Stream
.Stream
s are not meant to launch workers. In fact, if you usefindFirst
it may happen that it starts no worker but the first one. And so it also doesn’t wait “for all workers to finish” but only for currently pending threads. But since you have a rather small stream it might be the case that all workers have been started already because there are as much threads available in your environment. But this is not a guaranteed behavior.Note that if you use a sequential stream instead of a parallel stream it will for sure process the first item only (as it returns
true
) and none of the other. But since the stream implementation can’t predict that result it will respect you request to “accelerate” the operation via parallel execution and may start processing more items in advance using more threads.