I'm trying to implement a producer/consumer model multithreaded program in C++ for a project I'm working on. The basic idea is that the main thread creates a second thread to watch a serial port for new data, process the data and put the result in a buffer that is periodically polled by the main thread. I've never written multi-threaded programs before. I've been reading lots of tutorials, but they're all in C. I think I've got a handle on the basic concepts, but I'm trying to c++ify it. For the buffer, I want to create a data class with mutex protection built in. This is what I came up with.
1) Am I going about this the wrong way? Is there a smarter way to implement a protected data class?
2) What will happen in the following code if two threads try to call ProtectedBuffer::add_back()
at the same time?
#include <deque>
#include "pthread.h"
template <class T>
class ProtectedBuffer {
std::deque<T> buffer;
pthread_mutex_t mutex;
public:
void add_back(T data) {
pthread_mutex_lock(&mutex);
buffer.push_back(data);
pthread_mutex_unlock(&mutex);
}
void get_front(T &data) {
pthread_mutex_lock(&mutex);
data = buffer.front();
buffer.pop_front();
pthread_mutex_unlock(&mutex);
}
};
Edit: Thanks for all the great suggestions. I've tried to implement them below. I also added some error checking so if a thread somehow manages to try to lock the same mutex twice it will fail gracefully. I think.
#include "pthread.h"
#include <deque>
class Lock {
pthread_mutex_t &m;
bool locked;
int error;
public:
explicit Lock(pthread_mutex_t & _m) : m(_m) {
error = pthread_mutex_lock(&m);
if (error == 0) {
locked = true;
} else {
locked = false;
}
}
~Lock() {
if (locked)
pthread_mutex_unlock(&m);
}
bool is_locked() {
return locked;
}
};
class TryToLock {
pthread_mutex_t &m;
bool locked;
int error;
public:
explicit TryToLock(pthread_mutex_t & _m) : m(_m) {
error = pthread_mutex_trylock(&m);
if (error == 0) {
locked = true;
} else {
locked = false;
}
}
~TryToLock() {
if (locked)
pthread_mutex_unlock(&m);
}
bool is_locked() {
return locked;
}
};
template <class T>
class ProtectedBuffer{
pthread_mutex_t mutex;
pthread_mutexattr_t mattr;
std::deque<T> buffer;
bool failbit;
ProtectedBuffer(const ProtectedBuffer& x);
ProtectedBuffer& operator= (const ProtectedBuffer& x);
public:
ProtectedBuffer() {
pthread_mutexattr_init(&mattr);
pthread_mutexattr_settype(&mattr, PTHREAD_MUTEX_ERRORCHECK);
pthread_mutex_init(&mutex, &mattr);
failbit = false;
}
~ProtectedBuffer() {
pthread_mutex_destroy(&mutex);
pthread_mutexattr_destroy(&mattr);
}
void add_back(T &data) {
Lock lck(mutex);
if (!lck.locked()) {
failbit = true;
return;
}
buffer.push_back(data);
failbit = false;
}
void get_front(T &data) {
Lock lck(mutex);
if (!lck.locked()) {
failbit = true;
return;
}
if (buffer.empty()) {
failbit = true;
return;
}
data = buffer.front();
buffer.pop_front();
failbit = false;
}
void try_get_front(T &data) {
TryToLock lck(mutex);
if (!lck.locked()) {
failbit = true;
return;
}
if (buffer.empty()) {
failbit = true;
return;
}
data = buffer.front();
buffer.pop_front();
failbit = false;
}
void try_add_back(T &data) {
TryToLock lck(mutex);
if (!lck.locked()) {
failbit = true;
return;
}
buffer.push_back(data);
failbit = false;
}
};
You have the basics, but I would take it a step further by wrapping the mutex itself in its own RAII wrapper, eg:
For the implementation you have, I think you have a good start. Since you asked about C++ifying, then if you have a compiler that supports C++11, you can use the new thread support.
You mentioned you wanted the main thread to poll this buffer, but I didn't see any mechanism that would allow it to do so. Either
get_front
should provide an error when there is nothing in the buffer, orget_buffer
should block the caller until data is available.If you wanted to bound how much data you add to your buffer, you can add a similar
full_cnd
condition variable to check for the full condition on which theadd_back
call would wait on if it were true. Then, theget_front_i
method could signal when the buffer wasn't full anymore.Since
add_back
is protected from mutual exclusion, if two threads call it at the same time, one thread will be blocked from callingpush_back
until the other thread is done.Several things:
You need to initialize
mutex
withpthread_mutex_init
in the constructor and free it withpthread_mutex_destroy
in the destructor.You must make your class non-copyable and non-assignable (or otherwise implement copy constructor and assignment operator correctly; see above).
It's worthwhile making a SBRM helper class for the lock:
Now you can make a synchronized scope like
{ Lock lk(mutex); /* ... */ }
.As for Question 2: Concurrent access is serialized by means of locking the mutex. One of the competing threads will sleep on the acquisition of the mutex lock.
' put the result in a buffer that is periodically polled by the main thread' - CPU waste and latency.
'Am I going about this the wrong way?' - yes. I don't know what sort of support you have in your system for secondary thread<> GUI thread comms, but there's always the PostMessage() API.
You need a Buffer class, sure, with a data member for serial rx data and a method to do the protocol/'process data'. You don't need much else. In your second thread, create a buffer class instance. Load it up, process data and PostMessage/dispatch/BeginInvoke its pointer to your GUI thread. IN THE VERY NEXT LINE OF CODE IN THE SERIAL THREAD, create another instance in the same instance pointer var for the next load of data from the serial port. After display/logging/whatever in the GUI, the GUI thread should delete() the *Buffer it received.
No latency, no CPU waste, no data copying, no chance of the serial thread and the GUI thread ever working on the same buffer instance, no nasty, complex buffer-sharing code, no locks, no hassle. It will just work well.
Anything else is going to be messy.
Edit - forgot about (2) - don't know,. wouldn't touch it with a barge-pole..