consumer/producer in c++

2019-01-24 19:09发布

This is a classic c/p problem where some threads produce data while other read the data. Both the producer and consumers are sharing a const sized buffer. If the buffer is empty then the consumers have to wait and if it is full then the producer has to wait. I am using semaphores to keep track of full or empty queues. The producer is going to decrement free spots semaphore, add value, and increment filled slots semaphore. So I am trying to implement a program that gets some numbers from the generator function, and then prints out the average of the numbers. By treating this as a producer-consumer problem, I am trying to save some time in the execution of the program. The generateNumber function causes some delay in the process so I want to create a number of threads that generate numbers, and put them into a queue. Then the "main thread" which is running the main function has to read from the queue and find the sum and then average. So here is what I have so far:

#include <cstdio> 
#include <cstdlib>
#include <time.h>
#include "Thread.h" 
#include <queue> 

int generateNumber() {
    int delayms = rand() / (float) RAND_MAX * 400.f + 200;
    int result = rand() / (float) RAND_MAX * 20;
    struct timespec ts;
    ts.tv_sec = 0;
    ts.tv_nsec = delayms * 1000000;
    nanosleep(&ts, NULL);
    return result; }


struct threadarg {
    Semaphore filled(0);
    Semaphore empty(n);
    std::queue<int> q; };


void* threadfunc(void *arg) {
    threadarg *targp = (threadarg *) arg;
    threadarg &targ = *targp;
    while (targ.empty.value() != 0) {
        int val = generateNumber();
        targ.empty.dec(); 
        q.push_back(val);
        targ.filled.inc(); }
}
int main(int argc, char **argv) {
    Thread consumer, producer;
    // read the command line arguments
    if (argc != 2) {
        printf("usage: %s [nums to average]\n", argv[0]);
        exit(1); }
    int n = atoi(argv[1]);
    // Seed random number generator
    srand(time(NULL));
}

I am a bit confused now because I am not sure how to create multiple producer threads that are generating numbers (if q is not full) while the consumer is reading from the queue (that is if q is not empty). I am not sure what to put in the main to implment it. also in "Thread.h", you can create a thread, a mutex, or a semaphore. The thread has the methods .run(threadFunc, arg), .join(), etc. A mutex can be locked or unlocked. The semaphore methods have all been used in my code.

4条回答
啃猪蹄的小仙女
2楼-- · 2019-01-24 19:19

When managing shared state like this, you need a condition variable and a mutex. The basic pattern is a function along the lines of:

ScopedLock l( theMutex );
while ( !conditionMet ) {
    theCondition.wait( theMutex );
}
doWhatever();
theCondition.notify();

In your case, I'd probably make the condition variable and the mutex members of the class implementing the queue. To write, the conditionMet would be !queue.full(), so you'd end up with something like:

ScopedLock l( queue.myMutex );
while ( queue.full() ) {
    queue.myCondition.wait();
}
queue.insert( whatever );
queue.myCondition.notify();

and to read:

ScopedLock l( queue.myMutex );
while ( queue.empty() ) {
    queue.myCondition.wait();
}
results = queue.extract();
queue.myCondition.notify();
return results;

Depending on the threading interface, there may be two notify functions: notify one (which wakes up a single thread), and notify all (which wakes up all of the waiting threads); in this case, you'll need notify all (or you'll need two condition variables, one for space to write, and one for something to read, with each function waiting on one, but notifying the other).

查看更多
放我归山
3楼-- · 2019-01-24 19:24

Your queue is not synchronized, so multiple producers could call push_back at the same time, or at the same time the consumer is calling pop_front ... this will break.

The simple approach to making this work is to use a thread-safe queue, which can be a wrapper around the std::queue you already have, plus a mutex.

You can start by adding a mutex, and locking/unlocking it around each call you forward to std::queue - for a single consumer that should be sufficient, for multiple consumers you'd need to fuse front() and pop_front() into a single synchronized call.

To let the consumer block while the queue is empty, you can add a condition variable to your wrapper.

That should be enough that you can find the answer online - sample code below.


template <typename T> class SynchronizedQueue
{
    std::queue<T> queue_;
    std::mutex mutex_;
    std::condition_variable condvar_;

    typedef std::lock_guard<std::mutex> lock;
    typedef std::unique_lock<std::mutex> ulock;

public:
    void push(T const &val)
    {
        lock l(mutex_); // prevents multiple pushes corrupting queue_
        bool wake = queue_.empty(); // we may need to wake consumer
        queue_.push(val);
        if (wake) condvar_.notify_one();
    }

    T pop()
    {
        ulock u(mutex_);
        while (queue_.empty())
            condvar_.wait(u);
        // now queue_ is non-empty and we still have the lock
        T retval = queue_.front();
        queue_.pop();
        return retval;
    }
};

Replace std::mutex et al with whatever primitives your "Thread.h" gives you.

查看更多
叛逆
4楼-- · 2019-01-24 19:26

Protect the queue accesses with a mutex, that should be it. A 'Computer Science 101' bounded producer-consumer queue needs two semaphores, (to manage the free/empty count and for producers/consumers to wait on, as you are already doing), and one mutex/futex/criticalSection to protect the queue.

I don't see how replacing the semaphores and mutex with condvars is any great help. What's the point? How do you implement a bounded producer-consumer queue with condvars that works on all platforms with multiple producers/consumers?

查看更多
我只想做你的唯一
5楼-- · 2019-01-24 19:27

What I would do is this:

  • Make a data class that hides your queue
  • Create thread-safe accessor methods for saving a piece of data to the q, and removing a piece of data from the q ( I would use a single mutex, or a critical section for accessors)
  • Handle the case where a consumor doesn't have any data to work with (sleep)
  • Handle the case where the q is becoming too full, and the producers need to slow down
  • Let the threads go willy-nilly adding and removing as they produce / consume

Also, remember to add a sleep into each and every thread, or else you'll peg the CPU and not give the thread scheduler a good spot to switch contexts and share the CPU with other threads / processes. You don't need to, but it's a good practice.

查看更多
登录 后发表回答