Thread-safe way to build mutex protection into a C

2019-04-12 12:17发布

问题:

I'm trying to implement a producer/consumer model multithreaded program in C++ for a project I'm working on. The basic idea is that the main thread creates a second thread to watch a serial port for new data, process the data and put the result in a buffer that is periodically polled by the main thread. I've never written multi-threaded programs before. I've been reading lots of tutorials, but they're all in C. I think I've got a handle on the basic concepts, but I'm trying to c++ify it. For the buffer, I want to create a data class with mutex protection built in. This is what I came up with.

1) Am I going about this the wrong way? Is there a smarter way to implement a protected data class?

2) What will happen in the following code if two threads try to call ProtectedBuffer::add_back() at the same time?

#include <deque>
#include "pthread.h"

template <class T>
class ProtectedBuffer {
  std::deque<T> buffer;
  pthread_mutex_t mutex;
public:
  void add_back(T data) {
    pthread_mutex_lock(&mutex);
    buffer.push_back(data);
    pthread_mutex_unlock(&mutex);
  }
  void get_front(T &data) {
    pthread_mutex_lock(&mutex);
    data = buffer.front();
    buffer.pop_front();
    pthread_mutex_unlock(&mutex);
  }
};

Edit: Thanks for all the great suggestions. I've tried to implement them below. I also added some error checking so if a thread somehow manages to try to lock the same mutex twice it will fail gracefully. I think.

#include "pthread.h"
#include <deque>


class Lock {
    pthread_mutex_t &m;
    bool locked;
    int error;
public:
    explicit Lock(pthread_mutex_t & _m) : m(_m) {
        error = pthread_mutex_lock(&m);
        if (error == 0) {
            locked = true;
        } else {
            locked = false;
        }
    }
    ~Lock() {
        if (locked)
            pthread_mutex_unlock(&m);
    }
    bool is_locked() {
        return locked;
    }
};

class TryToLock {
    pthread_mutex_t &m;
    bool locked;
    int error;
public:
    explicit TryToLock(pthread_mutex_t & _m) : m(_m) {
        error = pthread_mutex_trylock(&m);
        if (error == 0) {
            locked = true;
        } else {
            locked = false;
        }
    }
    ~TryToLock() {
        if (locked)
            pthread_mutex_unlock(&m);
    }
    bool is_locked() {
        return locked;
    }
};

template <class T>
class ProtectedBuffer{
    pthread_mutex_t mutex;
    pthread_mutexattr_t mattr;
    std::deque<T> buffer;
    bool failbit;

    ProtectedBuffer(const ProtectedBuffer& x);
    ProtectedBuffer& operator= (const ProtectedBuffer& x);
public:
    ProtectedBuffer() {
        pthread_mutexattr_init(&mattr);
        pthread_mutexattr_settype(&mattr, PTHREAD_MUTEX_ERRORCHECK);
        pthread_mutex_init(&mutex, &mattr);
        failbit = false;
    }
    ~ProtectedBuffer() {
        pthread_mutex_destroy(&mutex);
        pthread_mutexattr_destroy(&mattr);
    }
    void add_back(T &data) {
        Lock lck(mutex);
        if (!lck.locked()) {
            failbit = true;
            return;
        }
        buffer.push_back(data);
        failbit = false;
    }
    void get_front(T &data) {
        Lock lck(mutex);
        if (!lck.locked()) {
            failbit = true;
            return;
        }
        if (buffer.empty()) {
            failbit = true;
            return;
        }
        data = buffer.front();
        buffer.pop_front();
        failbit = false;
    }
    void try_get_front(T &data) {
        TryToLock lck(mutex);
        if (!lck.locked()) {
            failbit = true;
            return;
        }
        if (buffer.empty()) {
            failbit = true;
            return;
        }
        data = buffer.front();
        buffer.pop_front();
        failbit = false;
    }
    void try_add_back(T &data) {
        TryToLock lck(mutex);
        if (!lck.locked()) {
            failbit = true;
            return;
        }
        buffer.push_back(data);
        failbit = false;
    }
};

回答1:

Several things:

  • You need to initialize mutex with pthread_mutex_init in the constructor and free it with pthread_mutex_destroy in the destructor.

  • You must make your class non-copyable and non-assignable (or otherwise implement copy constructor and assignment operator correctly; see above).

  • It's worthwhile making a SBRM helper class for the lock:

    class Lock
    {
        pthread_mutex_t & m;
    public:
        explicit Lock(pthread_mutex_t & _m) : m(_m) { pthread_mutex_lock(&m); }
        ~Lock() { pthread_mutex_unlock(&m); }
    };
    

    Now you can make a synchronized scope like { Lock lk(mutex); /* ... */ }.

As for Question 2: Concurrent access is serialized by means of locking the mutex. One of the competing threads will sleep on the acquisition of the mutex lock.



回答2:

Am I going about this the wrong way? Is there a smarter way to implement a protected data class?

For the implementation you have, I think you have a good start. Since you asked about C++ifying, then if you have a compiler that supports C++11, you can use the new thread support.

You mentioned you wanted the main thread to poll this buffer, but I didn't see any mechanism that would allow it to do so. Either get_front should provide an error when there is nothing in the buffer, or get_buffer should block the caller until data is available.

#include <deque>
#include <mutex>
#include <condition_variable>
#include <stdexcept>

template <class T>
class ProtectedBuffer {
  std::deque<T> buffer;
  std::mutex mtx;
  std::condition_variable empty_cnd;
  void get_front_i(T &data) {
    data = buffer.front();
    buffer.pop_front();
  }
public:
  void add_back(T data) {
    std::lock_guard<std::mutex> g(mtx);
    bool was_empty = buffer.empty();
    buffer.push_back(data);
    if (was_empty) empty_cnd.notify_one();
  }
  void get_front_check(T &data) {
    std::lock_guard<std::mutex> g(mtx);
    if (buffer.empty()) throw std::underflow_error("no data");
    get_front_i(data);
  }
  void get_front_block(T &data) {
    std::lock_guard<std::mutex> g(mtx);
    std::unique_lock<std::mutex> u(mtx);
    while (buffer.empty()) empty_cnd.wait(u);
    get_front_i(data);
    if (!buffer.empty()) empty_cnd.notify_one();
  }
};

If you wanted to bound how much data you add to your buffer, you can add a similar full_cnd condition variable to check for the full condition on which the add_back call would wait on if it were true. Then, the get_front_i method could signal when the buffer wasn't full anymore.

What will happen in the following code if two threads try to call ProtectedBuffer::add_back() at the same time?

Since add_back is protected from mutual exclusion, if two threads call it at the same time, one thread will be blocked from calling push_back until the other thread is done.



回答3:

You have the basics, but I would take it a step further by wrapping the mutex itself in its own RAII wrapper, eg:

#include <deque> 
#include "pthread.h" 

class ProtectedMutex
{
  pthread_mutex_t &mutex; 
public:
  ProtectedMutex(pthread_mutex_t &m)
    : mutex(m); 
  {
    pthread_mutex_lock(&mutex); 
  }
  ~ProtectedMutex()
  {
    pthread_mutex_unlock(&mutex); 
  }
};

template <class T> 
class ProtectedBuffer { 
  std::deque<T> buffer; 
  pthread_mutex_t mutex; 
public: 
  void add_back(T data) { 
    ProtectedMutex m(mutex); 
    buffer.push_back(data); 
  } 
  void get_front(T &data) { 
    ProtectedMutex m(mutex); 
    data = buffer.front(); 
    buffer.pop_front(); 
  } 
}; 


回答4:

' put the result in a buffer that is periodically polled by the main thread' - CPU waste and latency.

'Am I going about this the wrong way?' - yes. I don't know what sort of support you have in your system for secondary thread<> GUI thread comms, but there's always the PostMessage() API.

You need a Buffer class, sure, with a data member for serial rx data and a method to do the protocol/'process data'. You don't need much else. In your second thread, create a buffer class instance. Load it up, process data and PostMessage/dispatch/BeginInvoke its pointer to your GUI thread. IN THE VERY NEXT LINE OF CODE IN THE SERIAL THREAD, create another instance in the same instance pointer var for the next load of data from the serial port. After display/logging/whatever in the GUI, the GUI thread should delete() the *Buffer it received.

No latency, no CPU waste, no data copying, no chance of the serial thread and the GUI thread ever working on the same buffer instance, no nasty, complex buffer-sharing code, no locks, no hassle. It will just work well.

Anything else is going to be messy.

Edit - forgot about (2) - don't know,. wouldn't touch it with a barge-pole..