I am new to multi-thread programming, I want to implement the following functionality.
- There are 2 threads, producer and consumer.
- Consumer only processes the latest value, i.e., last in first out (LIFO).
- Producer sometimes generates new value at a faster rate than consumer can process. For example, producer may generate 2 new value in 1 milli-second, but it approximately takes consumer 5 milli-seconds to process.
- If consumer receives a new value in the middle of processing an old value, there is no need to interrupt. In other words, consumer will finish current execution first, then start an execution on the latest value.
Here is my design process, please correct me if I am wrong.
- There is no need for a queue, since only the latest value is processed by consumer.
- Is notification sent from producer being queued automatically???
- I will use a counter instead.
- ConsumerThread() check the counter at the end, to make sure producer doesn't generate new value.
- But what happen if producer generates a new value just before consumer goes to sleep(), but after check the counter???
Here is some pseudo code.
boost::mutex mutex;
double x;
void ProducerThread()
{
{
boost::scoped_lock lock(mutex);
x = rand();
counter++;
}
notify(); // wake up consumer thread
}
void ConsumerThread()
{
counter = 0; // reset counter, only process the latest value
... do something which takes 5 milli-seconds ...
if (counter > 0)
{
... execute this function again, not too sure how to implement this ...
}
else
{
... what happen if producer generates a new value here??? ...
sleep();
}
}
Thanks.
The short answer is that you're almost certainly wrong.
With a producer/consumer, you pretty much need a queue between the two threads. There are basically two alternatives: either your code won't will simply lose tasks (which usually equals not working at all) or else your producer thread will need to block for the consumer thread to be idle before it can produce an item -- which effectively translates to single threading.
For the moment, I'm going to assume that the value you get back from
rand
is supposed to represent the task to be executed (i.e., is the value produced by the producer and consumed by the consumer). In that case, I'd write the code something like this:This, relegates nearly all the interlocking to the queue. The rest of the code for both threads pretty much ignores threading issues entirely.
If I understood your question correctly, for your particular application, the consumer only needs to process the latest available value provided by the producer. In other words, it's acceptable for values to get dropped because the consumer cannot keep up with the producer.
If that's the case, then I agree that you can get away without a queue and use a counter. However, the shared counter and value variables will be need to be accessed atomically.
You can use
boost::condition_variable
to signal notifications to the consumer that a new value is ready. Here is a complete example; I'll let the comments do the explaining.ADDENDUM
I was thinking about this question recently, and realized that this solution, while it may work, is not optimal. Your producer is using all that CPU just to throw away half of the computed values.
I suggest that you reconsider your design and go with a bounded blocking queue between the producer and consumer. Such a queue should have the following characteristics:
With this type of queue, you can effectively throttle down the producer so that it doesn't outpace the consumer. It also ensures that the producer doesn't waste CPU resources computing values that will be thrown away.
Libraries such as TBB and PPL provide implementations of concurrent queues. If you want to attempt to roll your own using
std::queue
(orboost::circular_buffer
) andboost::condition_variable
, check out this blogger's example.Implementing a pipeline is actually quite tricky if you are doing it ground-up. For example, you'd have to use condition variable to avoid the kind of race condition you described in your question, avoid busy waiting when implementing the mechanism for "waking up" the consumer etc... Even using a "queue" of just 1 element won't save you from some of these complexities.
It's usually much better to use specialized libraries that were developed and extensively tested specifically for this purpose. If you can live with Visual C++ specific solution, take a look at Parallel Patterns Library, and the concept of Pipelines.