I have a class that takes objects from a BlockingQueue
and processes them by calling take()
in a continuous loop. At some point I know that no more objects will be added to the queue. How do I interrupt the take()
method so that it stops blocking?
Here's the class that processes the objects:
public class MyObjHandler implements Runnable {
private final BlockingQueue<MyObj> queue;
public class MyObjHandler(BlockingQueue queue) {
this.queue = queue;
}
public void run() {
try {
while (true) {
MyObj obj = queue.take();
// process obj here
// ...
}
} catch (InterruptedException e) {
Thread.currentThread().interrupt();
}
}
}
And here's the method that uses this class to process objects:
public void testHandler() {
BlockingQueue<MyObj> queue = new ArrayBlockingQueue<MyObj>(100);
MyObjectHandler handler = new MyObjectHandler(queue);
new Thread(handler).start();
// get objects for handler to process
for (Iterator<MyObj> i = getMyObjIterator(); i.hasNext(); ) {
queue.put(i.next());
}
// what code should go here to tell the handler
// to stop waiting for more objects?
}
Very late but Hope this helps other too as I faced the similar problem and used the
poll
approach suggested by erickson above with some minor changes,This solved both the problems
BlockingQueue
so that it knows it has not to wait more for elementsHowever, if you do this, the thread might be interrupted while there are still items in the queue, waiting to be processed. You might want to consider using
poll
instead oftake
, which will allow the processing thread to timeout and terminate when it has waited for a while with no new input.Or don't interrupt, its nasty.
queue.notify()
, if it ends, callqueue.done()
If interrupting the thread is not an option, another is to place a "marker" or "command" object on the queue that would be recognized as such by MyObjHandler and break out of the loop.
Interrupt the thread: