Interrupt parallel Stream execution

2019-07-20 01:31发布

问题:

Consider this code :

Thread thread = new Thread(() -> tasks.parallelStream().forEach(Runnable::run));

tasks are a list of Runnables that should be executed in parallel. When we start this thread, and it begins its execution, then depending on some calculations we need to interrupt (cancel) all those tasks.

Interrupting the Thread will only stop one of exections. How do we handle others? or maybe Streams should not be used that way? or you know a better solution?

回答1:

You aren't actually running the Runnables on the Thread you are creating. You are running a thread which will submit to a pool, so:

Thread thread = new Thread(() -> tasks.parallelStream().forEach(Runnable::run));

In this example you are in lesser terms doing

List<Runnable> tasks = ...;
Thread thread = new Thread(new Runnable(){
    public void run(){
       for(Runnable r : tasks){
          ForkJoinPool.commonPool().submit(r);
       }
    }
});

This is because you are using a parallelStream that delegates to a common pool when handling parallel executions.

As far as I know, you cannot get a handle of the Threads that are executing your tasks with a parallelStream so may be out of luck. You can always do tricky stuff to get the thread but probably isn't the best idea to do so.



回答2:

You can use a ForkJoinPool to interrupt the threads:

@Test
public void testInterruptParallelStream() throws Exception {
    final AtomicReference<InterruptedException> exc = new AtomicReference<>();

    final ForkJoinPool forkJoinPool = new ForkJoinPool(4);
    // use the pool with a parallel stream to execute some tasks
    forkJoinPool.submit(() -> {
        Stream.generate(Object::new).parallel().forEach(obj -> {
            synchronized (obj) {
                try {
                    // task that is blocking
                    obj.wait();
                } catch (final InterruptedException e) {
                    exc.set(e);
                }
            }
        });
    });

    // wait until the stream got started
    Threads.sleep(500);
    // now we want to interrupt the task execution
    forkJoinPool.shutdownNow();
    // wait for the interrupt to occur
    Threads.sleep(500);
    // check that we really got an interruption in the parallel stream threads
    assertTrue(exc.get() instanceof InterruptedException);
}

The worker threads do really get interrupted, terminating a blocking operation. You can also call shutdown() within the Consumer.

Note that those sleeps might not be tweaked for a proper unit test, you might have better ideas to just wait as necessary. But it is enough to show that it is working.