ForkJoinPool resets thread interrupted state

2019-06-26 08:02发布

问题:

I just noticed the following phenomena when cancelling a Future returned by ForkJoinPool. Given the following example code:

ForkJoinPool pool = new ForkJoinPool();
Future<?> fut = pool.submit(new Callable<Void>() {

  @Override
  public Void call() throws Exception {
    while (true) {
      if (Thread.currentThread().isInterrupted()) { // <-- never true
        System.out.println("interrupted");
        throw new InterruptedException();
      }
    }
  }
});

Thread.sleep(1000);
System.out.println("cancel");
fut.cancel(true);

The program never prints interrupted. The docs of ForkJoinTask#cancel(boolean) say:

mayInterruptIfRunning - this value has no effect in the default implementation because interrupts are not used to control cancellation.

If ForkJoinTasks ignore interrupts, how else are you supposed to check for cancellation inside Callables submitted to a ForkJoinPool?

回答1:

This happens because Future<?> is a ForkJoinTask.AdaptedCallable which extends ForkJoinTask, whose cancel method is:

public boolean cancel(boolean mayInterruptIfRunning) {
    return setCompletion(CANCELLED) == CANCELLED;
}

private int setCompletion(int completion) {
    for (int s;;) {
        if ((s = status) < 0)
            return s;
        if (UNSAFE.compareAndSwapInt(this, statusOffset, s, completion)) {
            if (s != 0)
                synchronized (this) { notifyAll(); }
            return completion;
        }
    }
}

It does not do any interruptions, it just sets status. I suppose this happens becouse ForkJoinPools's Futures might have a very complicated tree structure, and it is unclear in which order to cancel them.



回答2:

Sharing some more light on top of @Mkhail answer:

Using ForkJoinPool execute() instead of submit() will force a failed Runnable to throw a worker exception, and this exception will be caught by the Thread UncaughtExceptionHandler.

Taking from Java 8 code:
submit is using AdaptedRunnableAction().
execute is using RunnableExecuteAction() (see the rethrow(ex)).

 /**
 * Adaptor for Runnables without results
 */
static final class AdaptedRunnableAction extends ForkJoinTask<Void>
    implements RunnableFuture<Void> {
    final Runnable runnable;
    AdaptedRunnableAction(Runnable runnable) {
        if (runnable == null) throw new NullPointerException();
        this.runnable = runnable;
    }
    public final Void getRawResult() { return null; }
    public final void setRawResult(Void v) { }
    public final boolean exec() { runnable.run(); return true; }
    public final void run() { invoke(); }
    private static final long serialVersionUID = 5232453952276885070L;
}

/**
 * Adaptor for Runnables in which failure forces worker exception
 */
static final class RunnableExecuteAction extends ForkJoinTask<Void> {
    final Runnable runnable;
    RunnableExecuteAction(Runnable runnable) {
        if (runnable == null) throw new NullPointerException();
        this.runnable = runnable;
    }
    public final Void getRawResult() { return null; }
    public final void setRawResult(Void v) { }
    public final boolean exec() { runnable.run(); return true; }
    void internalPropagateException(Throwable ex) {
        rethrow(ex); // rethrow outside exec() catches.
    }
    private static final long serialVersionUID = 5232453952276885070L;
}