Boost synchronization

2019-04-11 18:50发布

问题:

I have NUM_THREADS threads, with the following codes in my thread:

/*
Calculate some_value;
*/

//Critical section to accummulate all thresholds
{
    boost::mutex::scoped_lock lock(write_mutex);
    T += some_value;
    num_threads++;
    if (num_threads == NUM_THREADS){
        T = T/NUM_THREADS;
        READY = true;
        cond.notify_all();
        num_threads = 0;
    }
}

//Wait for average threshold to be ready
if (!READY)
{
    boost::unique_lock<boost::mutex> lock(wait_mutex);
    while (!READY){
        cond.wait(lock);
    }
}
//End critical section

/*
do_something;
*/

Basically, I want all the threads to wait for the READY signal before continuing. num_thread is set to 0, and READY is false before threads are created. Once in a while, deadlock occurs. Can anyone help please? All the boost variables are globally declared as follows:

boost::mutex write_mutex;
boost::mutex wait_mutex;
boost::condition cond;

回答1:

The code has a race condition on the READY flag (which I assume is just a bool variable). What may happen (i.e. one possible variant of thread execution interleaving) is:

Thread T1:                                 Thread T2:
if (!READY)                                
{
    unique_lock<mutex> lock(wait_mutex);   mutex::scoped_lock lock(write_mutex);
    while (!READY)                         /* ... */
    {                                      READY = true;
        /* !!! */                          cond.notify_all();
        cond.wait(lock);
    }
}

The code testing the READY flag is not synchronized with the code setting it (note the locks are different for these critical sections). And when T1 is in a "hole" between the flag test and waiting at cond, T2 may set the flag and send a signal to cond which T1 may miss.

The simplest solution is to lock the right mutex for the update of READY and condition notification:

/*...*/
T = T/NUM_THREADS;
{
    boost::mutex::scoped_lock lock(wait_mutex);
    READY = true;
    cond.notify_all();
}


回答2:

It looks like Boost.Thread's barriers might be what you need.

Here's a working example that averages values provided by several worker threads. Each worker thread uses the same shared barrier (via the accumulator instance) to synchronize each other.

#include <cstdlib>
#include <iostream>
#include <vector>
#include <boost/bind.hpp>
#include <boost/shared_ptr.hpp>
#include <boost/thread.hpp>

boost::mutex coutMutex;
typedef boost::lock_guard<boost::mutex> LockType;

class Accumulator
{
public:
    Accumulator(int count) : barrier_(count), sum_(0), count_(count) {}

    void accumulateAndWait(float value)
    {
        {
            // Increment value
            LockType lock(mutex_);
            sum_ += value;
        }
        barrier_.wait(); // Wait for other the threads to wait on barrier.
    }

    void wait() {barrier_.wait();} // Wait on barrier without changing sum.

    float sum() {LockType lock(mutex_); return sum_;} // Return current sum

    float average() {LockType lock(mutex_); return sum_ / count_;}

    // Reset the sum. The barrier is automatically reset when triggered.
    void reset() {LockType lock(mutex_); sum_ = 0;}

private:
    typedef boost::lock_guard<boost::mutex> LockType;
    boost::barrier barrier_;
    boost::mutex mutex_;
    float sum_;
    int count_;
};

/*  Posts a value for the accumulator to add and waits for other threads
    to do the same. */
void workerFunction(Accumulator& accumulator)
{
    // Sleep for a random amount of time before posting value
    int randomMilliseconds = std::rand() % 3000;
    boost::posix_time::time_duration randomDelay =
            boost::posix_time::milliseconds(randomMilliseconds);
    boost::this_thread::sleep(randomDelay);

    // Post some random value
    float value = std::rand() % 100;

    {
        LockType lock(coutMutex);
        std::cout << "Thread " << boost::this_thread::get_id() << " posting "
                  << value << " after " << randomMilliseconds << "ms\n";
    }
    accumulator.accumulateAndWait(value);

    float avg = accumulator.average();

    // Print a message to indicate this thread is past the barrier.
    {
        LockType lock(coutMutex);
        std::cout << "Thread " << boost::this_thread::get_id() << " unblocked. "
                  << "Average = " << avg << "\n" << std::flush;
    }
}

int main()
{
    int workerThreadCount = 5;
    Accumulator accumulator(workerThreadCount);

    // Create and launch worker threads
    boost::thread_group threadGroup;
    for (int i=0; i<workerThreadCount; ++i)
    {
        threadGroup.create_thread(
                boost::bind(&workerFunction, boost::ref(accumulator)));
    }

    // Wait for all worker threads to finish
    threadGroup.join_all();
    {
        LockType lock(coutMutex);
        std::cout << "All worker threads finished\n" << std::flush;
    }

    /* Pause a bit before exiting, to give worker threads a chance to
       print their messages. */
    boost::this_thread::sleep(boost::posix_time::seconds(1));
}

I get the following output:

Thread 0x100100f80 posting 72 after 1073ms
Thread 0x100100d30 posting 44 after 1249ms
Thread 0x1001011d0 posting 78 after 1658ms
Thread 0x100100ae0 posting 23 after 1807ms
Thread 0x100101420 posting 9 after 1930ms
Thread 0x100101420 unblocked. Average = 45.2
Thread 0x100100f80 unblocked. Average = 45.2
Thread 0x100100d30 unblocked. Average = 45.2
Thread 0x1001011d0 unblocked. Average = 45.2
Thread 0x100100ae0 unblocked. Average = 45.2
All worker threads finished