This is a classic c/p problem where some threads produce data while other read the data. Both the producer and consumers are sharing a const sized buffer. If the buffer is empty then the consumers have to wait and if it is full then the producer has to wait. I am using semaphores to keep track of full or empty queues. The producer is going to decrement free spots semaphore, add value, and increment filled slots semaphore. So I am trying to implement a program that gets some numbers from the generator function, and then prints out the average of the numbers. By treating this as a producer-consumer problem, I am trying to save some time in the execution of the program. The generateNumber function causes some delay in the process so I want to create a number of threads that generate numbers, and put them into a queue. Then the "main thread" which is running the main function has to read from the queue and find the sum and then average. So here is what I have so far:
#include <cstdio>
#include <cstdlib>
#include <time.h>
#include "Thread.h"
#include <queue>
int generateNumber() {
int delayms = rand() / (float) RAND_MAX * 400.f + 200;
int result = rand() / (float) RAND_MAX * 20;
struct timespec ts;
ts.tv_sec = 0;
ts.tv_nsec = delayms * 1000000;
nanosleep(&ts, NULL);
return result; }
struct threadarg {
Semaphore filled(0);
Semaphore empty(n);
std::queue<int> q; };
void* threadfunc(void *arg) {
threadarg *targp = (threadarg *) arg;
threadarg &targ = *targp;
while (targ.empty.value() != 0) {
int val = generateNumber();
targ.empty.dec();
q.push_back(val);
targ.filled.inc(); }
}
int main(int argc, char **argv) {
Thread consumer, producer;
// read the command line arguments
if (argc != 2) {
printf("usage: %s [nums to average]\n", argv[0]);
exit(1); }
int n = atoi(argv[1]);
// Seed random number generator
srand(time(NULL));
}
I am a bit confused now because I am not sure how to create multiple producer threads that are generating numbers (if q is not full) while the consumer is reading from the queue (that is if q is not empty). I am not sure what to put in the main to implment it. also in "Thread.h", you can create a thread, a mutex, or a semaphore. The thread has the methods .run(threadFunc, arg), .join(), etc. A mutex can be locked or unlocked. The semaphore methods have all been used in my code.
When managing shared state like this, you need a condition variable and a mutex. The basic pattern is a function along the lines of:
In your case, I'd probably make the condition variable and the mutex members of the class implementing the queue. To write, the
conditionMet
would be!queue.full()
, so you'd end up with something like:and to read:
Depending on the threading interface, there may be two
notify
functions: notify one (which wakes up a single thread), and notify all (which wakes up all of the waiting threads); in this case, you'll need notify all (or you'll need two condition variables, one for space to write, and one for something to read, with each function waiting on one, but notifying the other).Your queue is not synchronized, so multiple producers could call
push_back
at the same time, or at the same time the consumer is callingpop_front
... this will break.The simple approach to making this work is to use a thread-safe queue, which can be a wrapper around the
std::queue
you already have, plus a mutex.You can start by adding a mutex, and locking/unlocking it around each call you forward to
std::queue
- for a single consumer that should be sufficient, for multiple consumers you'd need to fusefront()
andpop_front()
into a single synchronized call.To let the consumer block while the queue is empty, you can add a condition variable to your wrapper.
That should be enough that you can find the answer online - sample code below.
Replace
std::mutex
et al with whatever primitives your "Thread.h" gives you.Protect the queue accesses with a mutex, that should be it. A 'Computer Science 101' bounded producer-consumer queue needs two semaphores, (to manage the free/empty count and for producers/consumers to wait on, as you are already doing), and one mutex/futex/criticalSection to protect the queue.
I don't see how replacing the semaphores and mutex with condvars is any great help. What's the point? How do you implement a bounded producer-consumer queue with condvars that works on all platforms with multiple producers/consumers?
What I would do is this:
Also, remember to add a sleep into each and every thread, or else you'll peg the CPU and not give the thread scheduler a good spot to switch contexts and share the CPU with other threads / processes. You don't need to, but it's a good practice.