I saw a snippet of code in this question which I could not understand (most probably due to the fact am a beginner in this area). The question talks about "an obvious race condition where sometimes the producer will finish, signal it, and the ConsumerWorkers will stop BEFORE consuming everything in the queue."
In my understanding, "isRunning" will be set on the consumers only after the producer decides not to add anymore items in the queue. So, if a consumer thread sees isRunning as FALSE AND then sees inputQueue is empty, then there is NO possibility of anything more getting added into the queue in the future. Obviosuly, I am wrong and missing something, as no one who responded to that question said the scenario of the question is impossible. So, Can someone pls explain what sequence of events causes this race condition ?
In fact, I see a problem with something else. For ex, if multiple consumer threads saw that the producer isRunning, and say the queue had ONE item, many threads could enter the blocked 'take'. If the producer STOPS now, while one thread would come out of the 'take', the other threads are blocked on the 'take' forever. Interestingly, no one who answered the question pointed out this problem as well. So, my understanding of this is also probably faulty ?!
I didnt want to add this as a comment there in that question, as it is an old question and my doubt may never get answered ! I am copy/placing the code from that question here for quick reference.
public class ConsumerWorker implements Runnable{
private BlockingQueue<Produced> inputQueue;
private volatile boolean isRunning = true;
public ConsumerWorker(BlockingQueue<Produced> inputQueue) {
this.inputQueue = inputQueue;
}
@Override
public void run() {
//worker loop keeps taking en element from the queue as long as the producer is still running or as
//long as the queue is not empty:
while(isRunning || !inputQueue.isEmpty()) {
System.out.println("Consumer "+Thread.currentThread().getName()+" START");
try {
Object queueElement = inputQueue.take();
//process queueElement
} catch (Exception e) {
e.printStackTrace();
}
}
}
//this is used to signal from the main thread that he producer has finished adding stuff to the queue
public void setRunning(boolean isRunning) {
this.isRunning = isRunning;
}