Let us suppose that I have a thread that consumes items produced by another thread. Its run method is as follows, with inQueue being a BlockingQueue
boolean shutdown = false;
while (!shutdown) {
try {
WorkItem w = inQueue.take();
w.consume();
} catch (InterruptedException e) {
shutdown = true;
}
}
Furthermore, a different thread will signal that there are no more work items by interrupting this running thread. Will take() throw an interrupted exception if it does not need to block to retrieve the next work item. i.e. if the producer signals that it is done filling the work queue, is it possible to accidentally leave some items in inQueue or miss the interrupt?
A good way to signal termination of a blocking queue is to submit a 'poison' value into the queue that indicates a shutdown has occurred. This ensures that the expected behavior of the queue is honored. Calling Thread.interupt() is probably not a good idea if you care about clearing the queue.
To provide some code:
You can't in general interrupt the threads of an
ExecutorService
from external code if you usedExecutorService::execute(Runnable)
to start the threads, because external code does not have a reference to theThread
objects of each of the running threads (see the end of this answer for a solution though, if you needExecutorService::execute
). However, if you instead useExecutorService::submit(Callable<T>)
to submit the jobs, you get back aFuture<T>
, which internally keeps a reference to the running thread onceCallable::call()
begins execution. This thread can be interrupted by callingFuture::cancel(true)
. Any code within (or called by) theCallable
that checks the current thread's interrupt status can therefore be interrupted via theFuture
reference. This includesBlockingQueue::take()
, which, even when blocked, will respond to thread interruption. (JRE blocking methods will typically wake up if interrupted while blocked, realize they have been interrupted, and throw anInterruptedException
.)To summarize:
Future::cancel()
andFuture::cancel(true)
both cancel future work, whileFuture::cancel(true)
also interrupts ongoing work (as long as the ongoing work responds to thread interrupt). Neither of the twocancel
invocations affects work that has already successfully completed.Note that once a thread is interrupted by cancellation, an
InterruptException
will be thrown within the thread (e.g. byBlockingQueue::take()
in this case). However, you aCancellationException
will be thrown back in the main thread the next time you callFuture::get()
on a successfully cancelledFuture
(i.e. aFuture
that was cancelled before it completed). This is different from what you would normally expect: if a non-cancelledCallable
throwsInterruptedException
, the next call toFuture::get()
will throwInterruptedException
, but if a cancelledCallable
throwsInterruptedException
, the next call toFuture::get()
will throughCancellationException
.Here's an example that illustrates this:
Each time you run this, the output will be different, but here's one run:
This shows that Thread 2 and Thread 3 completed before
Future::cancel()
was called. Thread 1 was cancelled, so internallyInterruptedException
was thrown, and externallyCancellationException
was thrown. Thread 0 was cancelled before it started running. (Note that the thread indices won't in general correlate with theFuture
indices, soFuture 0 was cancelled
could correspond to either thread 0 or thread 1 being cancelled, and the same forFuture 1 was cancelled
.)Advanced: one way to achieve the same effect with
Executor::execute
(which does not return aFuture
reference) rather thanExecutor::submit
would be to create aThreadPoolExecutor
with a customThreadFactory
, and have yourThreadFactory
record a reference in a concurrent collection (e.g. a concurrent queue) for every thread created. Then to cancel all threads, you can simply callThread::interrupt()
on all previously-created threads. However, you will need to deal with the race condition that new threads may be created while you are interrupting existing threads. To handle this, set anAtomicBoolean
flag, visible to theThreadFactory
, that tells it not to create any more threads, then once that is set, cancel the existing threads.The java.concurrency.utils package was designed and implemented by some of the finest minds in concurrent programming. Also, interrupting threads as a means to terminate them is explicitly endorsed by their book "Java Concurrency in Practice". Therefore, I would be extremely surprised if any items were left in the queue due to an interrupt.
I wondered about the same thing and reading the javadoc for take() I believed that it would throw an interrupted exception only after having taken all the items in the queue, since if the queue had items, it would not have to "wait". But I made a small test:
The runner will take 10-11 items and then finish i.e. take() will throw InterruptedException even if there still is items in the queue.
Summary: Use the Poison pill approach instead, then you have full control over how much is left in the queue.
According to javadoc, the
take()
method will throwInterruptedException
if interrupted while waiting.