I am trying to implement a simple producer/consumer system in Java 11. Basically, I take two threads for each, plus a global queue, simply as follows:
- A global priority queue.
- The first thread, producer, runs a HTTP server, listens to incoming http messages, and upon receiving a message,
pushes
it as a job to the queue (queue.size
increments) - The second thread, the consumer, continously
peeks
the queue. If there is a job (job ! = null
), submits a HTTP request somewhere and upon successful receipt, polls it from the queue (queue.size()
decrements).
The skeleton is as below:
Main Class:
public class Manager
{
private Consumer consumer;
private Producer producer;
Queue queue;
public static void main (String args[])
{
consumer = new Consumer();
producer = new Producer();
}
}
Producer class:
public class Producer implements Runnable
{
public Producer()
{
Thread producer = new Thread(this);
producer.start();
}
public void run()
{
//HTTP server starts, listens, and adds to the queue upon receiving a Job
server.start();
Manager.queue.add(new Job());
}
}
Consumer class:
public class Consumer implements Runnable
{
public Consumer()
{
Thread consumer = new Thread(this);
consumer.start();
}
public void run()
{
// Thread.sleep(1);
while(true)
{
//get an object off the queue
Job job= Manager.queue.peek();
//do some stuff with the object
}
}
}
Producer
and queue
works- all good. But the problem is with the Consumer
. The Consumer code above (with while(true)
loop) doesn't peek the item. But when I add a Thread.sleep(x)
before while(true)
loop, even if x=1 ms
, it works, and grabs the item successfully.
What is the problem? Theoretically, while(true)
loop shouldn't be a problem! Why it can not see and peek
the item?!
The cause of the problem: non-synchronized reading and writing from and to a queue.
What happens here is that both threads, running on different CPU-cores work with their own copy of the queue, thus the producer might be adding stuff and these changes probably even get propagated into RAM, but the consumer never checks for anything in RAM, since it has it's own cached copy of that queue, witch stays empty.
The
Thread.sleep()
thing works, because when waking up, the thread has to get all ist stuff from RAM, where it probably changed.The correct way of doing it, is only accessing the Queue, when synchronized on it as follows:
In producer:
and in Consumer:
And as a final touch: the
while (true)
-thing is incredibly inefficient, you could do something usingObject.wait()
andObject.notify()
In producer:
and in Consumer:
PriorityQueue
is not thread-safe whereasPriorityBlockingQueue
is. As long as you aren't using any methods defined in theBlockingQueue
interface, these two implementations are interchangeable. Simply changingPriorityQueue
toPriorityBlockingQueue
should fix your problem.