Reusable barrier implementation using POSIX semaph

2019-08-17 07:16发布

问题:

Need a solution that creates 5 pthreads. Each pthread executes a function that involves iterating through a loop 10 times. In each iteration of the loop, a thread increments an int from 0 to 0.9*MAX_INT and then prints the iteration number. Make sure that each of the 5 threads finish the ith iteration of the loop before they can start the (i+1)th iteration (i.e. all threads synchronize/rendezvous towards the end of each iteration). I need to use a two-phase barrier implemented using POSIX semaphores to enforce the synchronization constraint

I wrote following code am I correct ?

#include <stdio.h>

#include <stdlib.h>

#include <pthread.h>

int thread_count;

void* MyThread(void* rank);

int main()

{

  long thread;

   pthread_t* thread_handles;

   thread_count = 5;

   thread_handles = malloc (thread_count*sizeof(pthread_t));

   for (thread = 0; thread < thread_count; thread++)

       pthread_create(&thread_handles[thread],NULL,MyThread,(void*) thread);

   for (thread = 0; thread < thread_count; thread++)

       pthread_join(thread_handles[thread], NULL);

   free(thread_handles);

   return 0;

}

void* Hello(void* rank)

{

    long my_rank = (long) rank;

    int a,i;

    a=0;

    for(i=0;i<10;i++)

    {

          int n = 5;
          int count = 0;

          pthread_mutex_t mutex = Semaphore(1)

          barrier = Semaphore(0)

          a = a + 0.9*MAX_INT;

          printf("this is %d iteration\n",i);

          mutex.wait()

          count = count + 1

          mutex.signal()

          if count == n: barrier.signal() # unblock ONE thread

          barrier.wait()

          barrier.signal()

   }

}

Edit:

#define _GNU_SOURCE
#include <stdio.h>
#include <stdlib.h>
#include <unistd.h>
#include <pthread.h>
#include <time.h>
#include <semaphore.h>

typedef struct {
  int n;
  int count;
  sem_t mutex;
  sem_t turnstyle;
  sem_t turnstyle2;
} barrier_t;

void init_barrier(barrier_t *barrier, int n)
{
  barrier->n = n;
  barrier->count = 0;
  sem_init(&barrier->mutex, 0, 1);
  sem_init(&barrier->turnstyle, 0, 0);
  sem_init(&barrier->turnstyle2, 0, 0);
}

void phase1_barrier(barrier_t *barrier)
{
  sem_wait(&barrier->mutex);
  if (++barrier->count == barrier->n) {
    int i;
    for (i = 0; i < barrier->n; i++) {
      sem_post(&barrier->turnstyle);
    }
  }
  sem_post(&barrier->mutex);
  sem_wait(&barrier->turnstyle);
}

void phase2_barrier(barrier_t *barrier)
{
  sem_wait(&barrier->mutex);
  if (--barrier->count == 0) {
    int i;
    for (i = 0; i < barrier->n; i++) {
      sem_post(&barrier->turnstyle2);
    }
  }
  sem_post(&barrier->mutex);
  sem_wait(&barrier->turnstyle2);
}

void wait_barrier(barrier_t *barrier)
{
  phase1_barrier(barrier);
  phase2_barrier(barrier);
}

#define NUM_THREADS 5

void *myThread(void *);

int main(int argc, char **argv)
{
  pthread_t threads[NUM_THREADS];
  barrier_t barrier;
  int i;

  init_barrier(&barrier, NUM_THREADS);

  for (i = 0; i < NUM_THREADS; i++) {
    pthread_create(&threads[i], NULL, myThread, &barrier);
  }

  for (i = 0; i < NUM_THREADS, i++) {
    pthread_join(threads[i], NULL);
  }

  return 0;
}

void *myThread(void *arg)
{
      barrier_t *barrier = arg;
      int i,a;

        for(i=0;i<10;i++)

            {
                a = a + 0.9*MAX_INT;

                printf("this is %d iteration\n",i);
            }
  return NULL;
}

回答1:

OK, if we examine the Barrier object in, "The Little Book of Semaphores", section 3.7.7, we see that we need a mutex and 2 semaphores called turnstile and turnstile2 (a mutex can be a semaphore the is initialized to 1).

Since we have to use POSIX semaphores, pthreads, and INT_MAX, we start by including the requisite header files:

#include <pthread.h>
#include <semaphore.h>
#include <limits.h>

The book makes the Barrier an object; however, in C, we don't really have objects, but we can create a struct with some functions to operate on it:

typedef struct {
  int n;
  int count;
  sem_t mutex;
  sem_t turnstyle;
  sem_t turnstyle2;
} barrier_t;

We can create a function to initialize a barrier:

void init_barrier(barrier_t *barrier, int n)
{
  barrier->n = n;
  barrier->count = 0;
  sem_init(&barrier->mutex, 0, 1);
  sem_init(&barrier->turnstile, 0, 0);
  sem_init(&barrier->turnstile2, 0, 0);
}

And implement the phase1 function, as in the book:

void phase1_barrier(barrier_t *barrier)
{
  sem_wait(&barrier->mutex);
  if (++barrier->count == barrier->n) {
    int i;
    for (i = 0; i < barrier->n; i++) {
      sem_post(&barrier->turnstile);
    }
  }
  sem_post(&barrier->mutex);
  sem_wait(&barrier->turnstile);
}

Note that the sem_post function only posts a single time, so a loop is required to post the turnstile n times.

The phase2 function also follows directly in the same manner:

void phase2_barrier(barrier_t *barrier)
{
  sem_wait(&barrier->mutex);
  if (--barrier->count == 0) {
    int i;
    for (i = 0; i < barrier->n; i++) {
      sem_post(&barrier->turnstile2);
    }
  }
  sem_post(&barrier->mutex);
  sem_wait(&barrier->turnstile2);
}

Finally, we can implement the wait function:

void wait_barrier(barrier_t *barrier)
{
  phase1_barrier(barrier);
  phase2_barrier(barrier);
}

Now, in your main function, you can allocate and initialize a barrier, and pass it to your spawned threads:

#define NUM_THREADS 5

void *myThread(void *);

int main(int argc, char **argv)
{
  pthread_t threads[NUM_THREADS];
  barrier_t barrier;
  int i;

  init_barrier(&barrier, NUM_THREADS);

  for (i = 0; i < NUM_THREADS; i++) {
    pthread_create(&threads[i], NULL, myThread, &barrier);
  }

  for (i = 0; i < NUM_THREADS, i++) {
    pthread_join(threads[i], NULL);
  }

  return 0;
}

And finally, implement the thread:

void *myThread(void *arg)
{
  barrier_t *barrier = arg;
  int i;
  int a;

  for (i = 0; i < 10; i++) {
    for (a = 0; a < 0.9*INT_MAX; a++);
    printf("this is %d iteration\n", i);
    wait_barrier(barrier);
  }

  return NULL;
}