Creating a Blocking Queue

2019-08-07 11:25发布

Sometimes this implementation and execution of BlockingQueue just works. Sometimes it segfaults. Any idea why?

#include <thread>
using std::thread;
#include <mutex>
using std::mutex;
#include <iostream>
using std::cout;
using std::endl;
#include <queue>
using std::queue;
#include <string>
using std::string;
using std::to_string;
#include <functional>
using std::ref;

template <typename T>
class BlockingQueue {
private:
    mutex mutex_;
    queue<T> queue_;
public:
    T pop() {
        this->mutex_.lock();
        T value = this->queue_.front();
        this->queue_.pop();
        this->mutex_.unlock();
        return value;
    }

    void push(T value) {
        this->mutex_.lock();
        this->queue_.push(value);
        this->mutex_.unlock();
    }

    bool empty() {
        this->mutex_.lock();
        bool check = this->queue_.empty();
        this->mutex_.unlock();
        return check;
    }
};

void fillWorkQueue(BlockingQueue<string>& workQueue) {
    int size = 40000;
    for(int i = 0; i < size; i++)
        workQueue.push(to_string(i));
}

void doWork(BlockingQueue<string>& workQueue) {
    while(!workQueue.empty()) {
        workQueue.pop();
    }   
}

void multiThreaded() {
    BlockingQueue<string> workQueue;
    fillWorkQueue(workQueue);
    thread t1(doWork, ref(workQueue));
    thread t2(doWork, ref(workQueue));
    t1.join();
    t2.join();
    cout << "done\n";
}

int main() {
    cout << endl;

    // Multi Threaded
    cout << "multiThreaded\n";
    multiThreaded();
    cout << endl;
}

2条回答
爷的心禁止访问
2楼-- · 2019-08-07 11:37

The problem should lay here:

while(!itemQueue.empty()) {
    itemQueue.pop();
}

You reserve the mutex when checking of a value is left, then you release the mutex and it might happen that another thread is executed, finds out that a value is left and pops it. In the worst case no item is left afterwards and the first thread tries to pop while no element is left.

The solution is to make the front/pop calls on the internal queue in the same section than the check for empty in the same locked section, then the behavior would always be defined.

Another suggestion would be to use std::lock_guard when working with mutex because it improves the readability and does ensure that the mutex is released no matter what happens.

Considering the fact those two advice, your pop method could look like:

T pop() {
    std::lock_guard lock(this->mutex_); //mutex_ is locked
    T value;
    if( !this->queue_.empty() )
    {
        value = this->queue_.front();
        this->queue_.pop();
    }
    return value;
} //mutex_ is freed
查看更多
闹够了就滚
3楼-- · 2019-08-07 11:52

See here:

What do I get from front() of empty std container?

Bad things happen if you call .front() on an empty container, better check .empty() first.

Try:

T pop() {
    this->mutex_.lock();
    T value;
    if( !this->queue_.empty() )
    {
        value = this->queue_.front();  // undefined behavior if queue_ is empty
                                       // may segfault, may throw, etc.
        this->queue_.pop();
    }
    this->mutex_.unlock();
    return value;
}

Note: Since atomic operations are important on this kind of queue, I'd recommend API changes:

bool pop(T &t);  // returns false if there was nothing to read.

Better yet, if you're actually using this where it matters, you probably want to mark items in use before deleting in case of failure.

bool peekAndMark(T &t);  // allows one "marked" item per thread
void deleteMarked();     // if an item is marked correctly, pops it.
void unmark();           // abandons the mark. (rollback)
查看更多
登录 后发表回答