Is this a correct way to implement a bounded buffe

2019-02-23 11:46发布

问题:

I am working on a program that deals with multiple threads accessing, depositing in, and withdrawing from a bounded buffer container. I have noticed some major problems with the threads and suspect that my buffer is partially or fundamentally incorrect somewhere.

To be sure I know what I am doing with this, I would appreciate having my buffer code looked over. The class uses a Semaphore that I implemented elsewhere, which I will assume works for now (if not, I'll figure that out soon enough!) I have added comments that attempt to explain my reasoning.

First, the .h file:

#ifndef BOUNDED_BUFFER_H
#define BOUNDED_BUFFER_H

#include "Semaphore.H"
#include <string> 
#include <vector>  

using namespace std; 

class Item{ //supposed to eventually be more extensible...seems silly with just a string for now

  public:
    Item(string _content){content = _content;} 
    string GetContent() {return content;}     

  private:  
};   

    class BoundedBuffer{

      public:
        BoundedBuffer(); 

        void Deposit(Item* _item); 
        Item* Retrieve();        
        int GetNumItems() {return count;} 
        vector<Item*> GetBuffer() {return buffer;} 
        void SetSize(int _size){
          capacity = _size;
          buffer.reserve(_size);  //or do I need to call "resize" 
        }  

      private:
        int capacity; 
        vector<Item*> buffer; //I originally wanted to use an array but had trouble with  
                              //initilization/sizing, etc. 
        int nextin; 
            int nextout; 
            int count; 

            Semaphore notfull;   //wait on this one before depositing an item
            Semaphore notempty;  //wait on this one before retrieving an item
        };

    #endif

Next, the .cpp:

#include "BoundedBuffer.H"
#include <iostream>

using namespace std; 

BoundedBuffer::BoundedBuffer(){

  notfull.SetValue(0); 
  notempty.SetValue(0); 
  nextin = 0; 
  nextout = 0; 
  count = 0; 
}

void BoundedBuffer::Deposit(Item* _item){
  if(count == capacity){ 
    notfull.P(); //Cannot deposit into full buffer; wait
  }

  buffer[nextin] = _item; 
  nextin = (nextin + 1) % capacity;  //wrap-around
  count += 1;
  notempty.V();  //Signal that retrieval is safe 
}

Item* BoundedBuffer::Retrieve(){
  if(count == 0){
    notempty.P(); //cannot take from empty buffer; wait 
  }

  Item* x = buffer[nextout]; 
  nextout = (nextout + 1) % capacity;
  buffer.pop_back();  //or a different erase methodology? 
  count -= 1; 
  notfull.V(); //Signal that deposit is safe 
  return x; 
}

I think that issues could arise from my choice of a vector as the underlying container (or, rather, an incorrect use of it), or perhaps the need for more blocking mechanisms for safety (mutex locks, etc.?) From the looks of things, can anyone offer some feedback?

回答1:

This is an extremely common question (about how to do a proper multithreaded queue). The best answers I have seen before is this stack overflow question and this web site. Those answers are for unbounded queues, so I will expand and show an answer for a bounded queue here.

You need to protect your Deposit and Retrieve functions with mutexes and use condition variables to do the wake ups.

#include <mutex>
#include <condition_variable>

std::mutex the_mutex;
std::condition_variable the_notfull_cvar;
std::condition_variable the_notempty_cvar;

...

void BoundedBuffer::Deposit(Item* _item){
  std::unique_lock<std::mutex> lock(the_mutex);
  while ( /* buffer is full */ ){
    /* simultaneously wait and release the mutex */
    the_notfull_cvar.wait(lock);
    /* the mutex is reaquired at this point */
  }

  /* buffer has space and we own the mutex: insert the item */
  ...
  /* tell anyone waiting on an empty buffer that they can wake up. */
  the_notempty_cvar.notify_all();
}

Item* BoundedBuffer::Retrieve(){
  std::unique_lock<std::mutex> lock(the_mutex);
  while ( /* buffer is empty */ ){
    /* simultaneously wait and release the mutex */
    the_notempty_cvar.wait(lock);
    /* the mutex is reaquired at this point */
  }

  /* buffer has something in it and we own the mutex: get the item */
  ...
  /* tell anyone waiting on a full buffer that they can wake up. */
  the_notfull_cvar.notify_all();

  return x;
}

Your GetNumItems(), GetBuffer() and SetSize() functions also need to be protected with unique_locks.



回答2:

Have you looked at Boost:Circular_buffer? This is a fixed-size storage arena that has a standard library interface. This may do what you want or else give you some pointers. The circular buffer does overwrite the beginning if you write something when it is full. Maybe you don't want that although there is a full() test so you can avoid that in your code.