How do Java 8 parallel streams behave on a thrown exception in the consuming clause, for example in forEach
handling? For example, the following code:
final AtomicBoolean throwException = new AtomicBoolean(true);
IntStream.range(0, 1000)
.parallel()
.forEach(i -> {
// Throw only on one of the threads.
if (throwException.compareAndSet(true, false)) {
throw new RuntimeException("One of the tasks threw an exception. Index: " + i);
});
Does it stop the handled elements immediately? Does it wait for the already started elements to finish? Does it wait for all the stream to finish? Does it start handling stream elements after the exception is thrown?
When does it return? Immediately after the exception? After all/part of the elements were handled by the consumer?
Do elements continue being handled after the parallel stream threw the exception? (Found a case where this happened).
Is there a general rule here?
EDIT (15-11-2016)
Trying to determine if the parallel stream returns early, I found that it's not determinate:
@Test
public void testParallelStreamWithException() {
AtomicInteger overallCount = new AtomicInteger(0);
AtomicInteger afterExceptionCount = new AtomicInteger(0);
AtomicBoolean throwException = new AtomicBoolean(true);
try {
IntStream.range(0, 1000)
.parallel()
.forEach(i -> {
overallCount.incrementAndGet();
afterExceptionCount.incrementAndGet();
try {
System.out.println(i + " Sleeping...");
Thread.sleep(1000);
System.out.println(i + " After Sleeping.");
}
catch (InterruptedException e) {
e.printStackTrace();
}
// Throw only on one of the threads and not on main thread.
if (!Thread.currentThread().getName().equals("main") && throwException.compareAndSet(true, false)) {
System.out.println("Throwing exception - " + i);
throw new RuntimeException("One of the tasks threw an exception. Index: " + i);
}
});
Assert.fail("Should not get here.");
}
catch (Exception e) {
System.out.println("Cought Exception. Resetting the afterExceptionCount to zero - 0.");
afterExceptionCount.set(0);
}
System.out.println("Overall count: " + overallCount.get());
System.out.println("After exception count: " + afterExceptionCount.get());
}
Late return when throwing not from the main thread. This caused a lot of new elements to be handled way after the exception was thrown. On my machine, about 200 elements were handled after the exception was thrown. BUT, not all 1000 elements were handled. So what's the rule here? Why more elements were handled even though the exception was thrown?
Early return when removing the not (!
) sign, causing the exception to be thrown in the main thread. Only the already started elements finished processing and no new ones were handled. Returning early was the case here. Not consistent with the previous behavior.
What am I missing here?