C++ multithreading, simple consumer / producer thr

2019-07-23 04:46发布

I am new to multi-thread programming, I want to implement the following functionality.

  1. There are 2 threads, producer and consumer.
  2. Consumer only processes the latest value, i.e., last in first out (LIFO).
  3. Producer sometimes generates new value at a faster rate than consumer can process. For example, producer may generate 2 new value in 1 milli-second, but it approximately takes consumer 5 milli-seconds to process.
  4. If consumer receives a new value in the middle of processing an old value, there is no need to interrupt. In other words, consumer will finish current execution first, then start an execution on the latest value.

Here is my design process, please correct me if I am wrong.

  1. There is no need for a queue, since only the latest value is processed by consumer.
  2. Is notification sent from producer being queued automatically???
  3. I will use a counter instead.
  4. ConsumerThread() check the counter at the end, to make sure producer doesn't generate new value.
  5. But what happen if producer generates a new value just before consumer goes to sleep(), but after check the counter???

Here is some pseudo code.

boost::mutex mutex;
double x;

void ProducerThread() 
{
    {
        boost::scoped_lock lock(mutex);
        x = rand();
        counter++;
    }
    notify(); // wake up consumer thread
}   

void ConsumerThread()
{
    counter = 0; // reset counter, only process the latest value

... do something which takes 5 milli-seconds ...

    if (counter > 0) 
    {
... execute this function again, not too sure how to implement this ...
    } 
    else 
    {
... what happen if producer generates a new value here??? ...
        sleep();
    }
}

Thanks.

3条回答
Explosion°爆炸
2楼-- · 2019-07-23 05:39

The short answer is that you're almost certainly wrong.

With a producer/consumer, you pretty much need a queue between the two threads. There are basically two alternatives: either your code won't will simply lose tasks (which usually equals not working at all) or else your producer thread will need to block for the consumer thread to be idle before it can produce an item -- which effectively translates to single threading.

For the moment, I'm going to assume that the value you get back from rand is supposed to represent the task to be executed (i.e., is the value produced by the producer and consumed by the consumer). In that case, I'd write the code something like this:

void producer() { 
    for (int i=0; i<100; i++)
        queue.insert(random());    // queue.insert blocks if queue is full
    queue.insert(-1.0);            // Tell consumer to exit
}

void consumer() { 
    double value;
    while ((value = queue.get()) != -1) // queue.get blocks if queue is empty
        process(value);
}

This, relegates nearly all the interlocking to the queue. The rest of the code for both threads pretty much ignores threading issues entirely.

查看更多
男人必须洒脱
3楼-- · 2019-07-23 05:46

If I understood your question correctly, for your particular application, the consumer only needs to process the latest available value provided by the producer. In other words, it's acceptable for values to get dropped because the consumer cannot keep up with the producer.

If that's the case, then I agree that you can get away without a queue and use a counter. However, the shared counter and value variables will be need to be accessed atomically.

You can use boost::condition_variable to signal notifications to the consumer that a new value is ready. Here is a complete example; I'll let the comments do the explaining.

#include <boost/thread/thread.hpp>
#include <boost/thread/mutex.hpp>
#include <boost/thread/condition_variable.hpp>
#include <boost/thread/locks.hpp>
#include <boost/date_time/posix_time/posix_time_types.hpp>

boost::mutex mutex;
boost::condition_variable condvar;
typedef boost::unique_lock<boost::mutex> LockType;

// Variables that are shared between producer and consumer.
double value = 0;
int count = 0;

void producer()
{
    while (true)
    {
        {
            // value and counter must both be updated atomically
            // using a mutex lock
            LockType lock(mutex);
            value = std::rand();
            ++count;

            // Notify the consumer that a new value is ready.
            condvar.notify_one();
        }

        // Simulate exaggerated 2ms delay
        boost::this_thread::sleep(boost::posix_time::milliseconds(200));
    }
}

void consumer()
{
    // Local copies of 'count' and 'value' variables. We want to do the
    // work using local copies so that they don't get clobbered by
    // the producer when it updates.
    int currentCount = 0;
    double currentValue = 0;

    while (true)
    {
        {
            // Acquire the mutex before accessing 'count' and 'value' variables.
            LockType lock(mutex); // mutex is locked while in this scope
            while (count == currentCount)
            {
                // Wait for producer to signal that there is a new value.
                // While we are waiting, Boost releases the mutex so that
                // other threads may acquire it.
                condvar.wait(lock);
            }

            // `lock` is automatically re-acquired when we come out of
            // condvar.wait(lock). So it's safe to access the 'value'
            // variable at this point.
            currentValue = value; // Grab a copy of the latest value
                                  // while we hold the lock.
        }

        // Now that we are out of the mutex lock scope, we work with our
        // local copy of `value`. The producer can keep on clobbering the
        // 'value' variable all it wants, but it won't affect us here
        // because we are now using `currentValue`.
        std::cout << "value = " << currentValue << "\n";

        // Simulate exaggerated 5ms delay
        boost::this_thread::sleep(boost::posix_time::milliseconds(500));
    }
}

int main()
{
    boost::thread c(&consumer);
    boost::thread p(&producer);
    c.join();
    p.join();
}

ADDENDUM

I was thinking about this question recently, and realized that this solution, while it may work, is not optimal. Your producer is using all that CPU just to throw away half of the computed values.

I suggest that you reconsider your design and go with a bounded blocking queue between the producer and consumer. Such a queue should have the following characteristics:

  • Thread-safe
  • The queue has a fixed size (bounded)
  • If the consumer wants to pop the next item, but the queue is empty, the operation will be blocked until notified by the producer that an item is available.
  • The producer can check if there's room to push another item and block until the space becomes available.

With this type of queue, you can effectively throttle down the producer so that it doesn't outpace the consumer. It also ensures that the producer doesn't waste CPU resources computing values that will be thrown away.

Libraries such as TBB and PPL provide implementations of concurrent queues. If you want to attempt to roll your own using std::queue (or boost::circular_buffer) and boost::condition_variable, check out this blogger's example.

查看更多
叼着烟拽天下
4楼-- · 2019-07-23 05:52

Implementing a pipeline is actually quite tricky if you are doing it ground-up. For example, you'd have to use condition variable to avoid the kind of race condition you described in your question, avoid busy waiting when implementing the mechanism for "waking up" the consumer etc... Even using a "queue" of just 1 element won't save you from some of these complexities.

It's usually much better to use specialized libraries that were developed and extensively tested specifically for this purpose. If you can live with Visual C++ specific solution, take a look at Parallel Patterns Library, and the concept of Pipelines.

查看更多
登录 后发表回答