Non-busy blocking Queue Implementation in C

2020-06-25 04:59发布

问题:

I am trying to implement a queue in C that causes a process to non-busy wait until there is an element in the queue to consume. I have tried two different things to try to achieve this.

The first problem I have is if the enqueue/dequeue operations have if conditionals to check the bounds( if(q->count == QUEUESIZE) ), the call to sem_wait will return immediately because no other process has obtained a lock.

If I change the conditional to while(q->count == QUEUESIZE), I believe the consumer process will 'busy wait' until the semaphore is posted by the producer process, which is not my goal of the implementation, and through testing, I've found that the consumer process will not acquire the lock and continue.

I think that I am close, but I can't seem to figure out how to work around these issues. I have thought about adding condition variables or pthread_mutex, but wanted to exhaust the semaphore options before adding in additional complexity.

#define QUEUESIZE 48

typedef struct 
{           
    char q[QUEUESIZE+1][150];
    int first;                      
    int last;                       
    int count;                      
    sem_t *lock;                    
} Queue;


init_queue(Queue *q, sem_t *l)
{
    q->first = 0;
    q->last = QUEUESIZE-1;
    q->count = 0;
    q->lock = l;
}

enqueue(Queue *q, char x[150])
{
    while(q->count == QUEUESIZE)
        sem_wait(q->lock);

    if (q->count == 0)
    {
        if (sem_post(q->lock) == -1)
        {
            printf("Thread failed to unlock semaphore\n");
        }
    }       
    q->last = (q->last+1) % QUEUESIZE;
    strcpy(q->q[ q->last ],x);    
    q->count = q->count + 1;
}

dequeue(Queue *q,char *ptr)
{
    char x[150];
    while(q->count == 0)
        sem_wait(q->lock);

    if (q->count == QUEUESIZE) 
    {
        if (sem_post(q->lock) == -1)
        {
            printf("Thread failed to unlock semaphore\n");
        }
    }   
    strcpy(ptr,q->q[ q->first]);
    q->first = (q->first+1) % QUEUESIZE;
    q->count = q->count - 1;
}

回答1:

As requested, here is my solution.

#define QUEUESIZE 50

typedef struct 
{           
    char q[QUEUESIZE][150];
    int first;                      
    int last;                       
    int count;                      
    sem_t *full;
    sem_t *empty;
    sem_t *excl;

} Queue;


void init_queue(Queue *q, sem_t *f,sem_t *e, sem_t *ee,)
{
    q->first = 0;
    q->last = QUEUESIZE-1;
    q->count = 0;
    q->full = f;
    q->empty = e;
    q->excl = ee; 
}

void enqueue(Queue *q, char x[150])
{
    sem_wait(q->empty);
    sem_wait(q->excl);

    q->last = (q->last+1) % QUEUESIZE;
    strcpy(q->q[ q->last ],x);    
    q->count = q->count + 1;

    sem_post(q->excl);
    sem_post(q->full);
}

void dequeue(Queue *q,char *ptr)
{
    sem_wait(q->full);
    sem_wait(q->excl);

    strcpy(ptr,q->q[ q->first]);
    q->first = (q->first+1) % QUEUESIZE;
    q->count = q->count - 1;

    sem_post(q->excl);
    sem_post(q->empty);
}

I initialize the semaphores as follows:

sem_init(full,1,0);
sem_init(empty,1,49);
sem_init(dequeue_excl,1,1);
sem_init(enqueue_excl,1,1);


回答2:

As you recognize in your example using semaphores, you need some support from your operating system to accomplish this. If you are on an operating system that supports POSIX Message Queues, you can just rely on that. Otherwise, you can use pthread condition variables as the basis for your implementation.

The trick is, you need two condition variables to cover the full and empty wait states. The implementation is simple, but difficult to cover corner cases and even harder to test well.

I've prepared an example which is the time-honored Apache apr_queue implementation but with the dependencies stripped down to only pthreads: https://github.com/chrismerck/rpa_queue