Synchronize threads for pthread_cond_broadcast cal

2019-05-13 23:51发布

问题:

I have a simple application with a "manager" thread that spawns ten simple "worker" threads. I want all of the "worker" threads to block on the same condition variable (ie: condvar), and I want to manually signal all ten threads to wake up at the same time with a pthread_cond_broadcast call.

In the case of my application, it is possible for threads to suffer an error condition and terminate early, so it is possible that not all ten threads make it to the synchronization point.

One simple mechanism would be to create a pthread_barrier_t and have all ten threads call pthread_barrier_wait, and when all ten threads complete this call, they are all free to continue execution. However, this would require the threads being able to modify the number of threads the barrier requires to unblock. I don't know if this can be safely modified.

Additionally, I want to guarantee all the still-working threads not start automatically like they would with a barrier, I want to manually start them with a pthread_cond_broadcast call instead. How would I guarantee that all the threads that are still alive (ideally ten) have blocked on the condvar before I made the broadcast call?

Thanks!

回答1:

The following shows one way to do it, using a condition variable and a few other variables; though there may be better ways. The comments should show how it works. You'd have to modify things to suit your actual situation of course; for instance there might be loops involved, etc.

int activeThreads = 0;   /* number of threads currently going */
int waitingThreads = 0;  /* number of threads waiting on the condition */
int readyFlag = 0;       /* flag to tell the threads to proceed when signaled */
pthread_cond_t  cond;    /* condition to wait on / signal */
pthread_mutex_t mtx;     /* mutex for the above */

pthread_cond_t  condWaiting; /* EDIT: additional condition variable to signal 
                              * when each thread starts waiting */

void *threadFunc(void *arg)
{
  /* Edit: Rather than incrementing 'activeThreads' here, it should be done
   * in the main thread when each thread is created (to avoid a race) */

  /* ...do stuff... */

  /* When the threads should wait, do this (they wait for 'readyFlag' to be 
   * set, but also adjust the waiting thread count so the main thread can
   * determine whether to broadcast) */
  pthread_mutex_lock(&mtx);
    if (readyFlag == 0) {
      waitingThreads++;
      do {
        pthread_cond_signal(&condWaiting); /* EDIT: signal the main thread when
                                            * a thread begins waiting */
        pthread_cond_wait(&cond,&mtx);
      } while (readyFlag == 0);
      waitingThreads--;
    }
  pthread_mutex_unlock(&mtx);

  /* ...more stuff... */

  /* When threads terminate, they decrement the active thread count */
  pthread_mutex_lock(&mtx);
    activeThreads--;
    pthread_cond_signal(&condWaiting); /* EDIT: also signal the main thread
                                        * when a thread exits to make it 
                                        * recheck the waiting thread count if
                                        * waiting for all threads to wait */
  pthread_mutex_unlock(&mtx);

  return NULL;
}

int main(int argc, char *argv[])
{
  /* Edit: Showing some code to initialize the mutex, condition variable(s),
   * etc., and create some threads -- modify as needed */
  pthread_mutex_init(&mtx,NULL);
  pthread_cond_init(&cond,NULL);
  pthread_cond_init(&condWaiting,NULL); /* EDIT: if main thread should block
                                         * until all threads are waiting */
  activeThreads = waitingThreads = readyFlag = 0;

  /* Edit: Increment 'activeThreads' here rather than in the thread function,
   * to avoid a race (if the main thread started waiting for the others
   * when not all had incremented the count yet, the main thread might end
   * up waiting for fewer threads to be ready -- though it's unlikely */
  #define NUM_THREADS 10
  pthread_t workers[NUM_THREADS];
  for (int i = 0; i < NUM_THREADS; i++) {
    /* should use appropriate thread attributes */
    if (pthread_create(&workers[i],NULL,threadFunc,NULL) == 0)
      activeThreads++;
  }

  /* ...do stuff... */

  /* Set 'readyFlag' and do condition broadcast IF all threads are waiting,
   * or just carry on if they aren't */
  pthread_mutex_lock(&mtx);
    if ((activeThreads != 0) && (activeThreads == waitingThreads)) {
      readyFlag = 1;
      pthread_cond_broadcast(&cond);
    }
  pthread_mutex_unlock(&mtx);

  /* EDIT: OR.. to wait until all threads are waiting and then broadcast, do 
   * this instead: */
  pthread_mutex_lock(&mtx);
    while (waitingThreads < activeThreads) { /* wait on 'condWaiting' until all
                                              * active threads are waiting */
      pthread_cond_wait(&condWaiting,&mtx);
    }
    if (waitingThreads != 0) {
      readyFlag = 1;
      pthread_cond_broadcast(&cond);
    }
  pthread_mutex_unlock(&mtx);

  /* ...more stuff... */

  /* If needed, you can clear the flag when NO threads are waiting.. */
  pthread_mutex_lock(&mtx);
    if (waitingThreads == 0)
      readyFlag = 0;
  pthread_mutex_unlock(&mtx);

  /* ...even more stuff... */

  return 0;
}

I'd add, though, that I don't see when there'd be a good reason to do this rather than just protecting resources in a more straightforward way.

EDIT: Added some things to the code, showing a second condition variable used to let the main thread wait for all the workers to be ready. The changed parts are marked with "EDIT:" in comments, and can be left out if not needed. I've also corrected a race condition by moving the increment of activeThreads out of the thread function, and shown the initialization for the mutex, etc. (without error handling).



回答2:

Generally speaking, you should just set the condition variable (and it's associated flag) when the work is ready to go - there's usually no need to wait for the threads to block on the condition var. If they're 'late', they'll just notice that the flag is already set and not bother blocking.

But if you really do need to wait until all worker threads are at the point where they're cloking on the condition var, you can use a combination of condition variables - one that tracks how many threads are 'ready to go' and the other trigger them to start work. Some peudo code:

// manager thread thread

pthread_cond_t pseudo_barrier;
pthread_cond_t pseudo_barrier_complete_cond;
pthread_mutex_t pseudo_barrier_mux;
int pseudo_barrier_counter = NUM_THREADS;
int pseudo_barrier_complete_flag = 0;

void thread_manager(void) 
{
    pthread_cond_init( &pseudo_barrier, NULL);
    pthread_cond_init( &pseudo_barrier_complete_cond, NULL);
    pthread_mutex_init( &pseudo_barrier_mux, NULL);


    for (int i = 0 ; i < NUM_THREADS; ++i) {
        pthread_create( /*... */);
    }

    // wait for threads to 'stage'
    pthread_mutex_lock( &pseudo_barrier_mux);
    while (pseudo_barrier_counter != 0) {
        pthread_cond_wait( &pseudo_barrier, &pseudo_barrier_mux);
    }
    pthread_mutex_unlock( &pseudo_barrier_mux);


    // at this point, all threads have either bailed out or are waiting to go
    // let 'em rip

    pthread_mutex_lock( &pseudo_barrier_mux);
    pseudo_barrier_complete_flag = 1;
    pthread_mutex_unlock( &pseudo_barrier_mux);
    pthread_cond_broadcast( &pseudo_barrier_complete_cond);

    // do whatever else the manager thread needs to do...
}


// worker threads
void* worker_thread(void* context)
{
    int error_result = 0;

    // whatever initialization...
    //  if this thread is going to bail out due to an error, it needs to 
    //  set the `error_result` value appropriately and still drop into the 
    //  following code

    // let the manager know that this thread is waiting (or isn't going to participate)
    pthread_mutex_lock( &pseudo_barrier_mux);
    --pseudo_barrier_counter;

    if (pseudo_barrier_counter == 0) {
        // all other threads are accounted for, let the manager know we're ready
        pthread_cond_signal( &pseudo_barrier);
    }

    // if this thread isn't going to contine because of some error, it's already 
    //  accounted for that fact in the `my_barrier_count`, so we can return here
    //  without preventing the pseudo-barrier from being met.
    if (some_error_occurred) {
        pthread_mutex_lock( &pseudo_barrier_mux);
        return NULL;
    }

    // NOTE: we're still holding pseudo_barrier_mux, so the master thread is still 
    //  blocked, even if we've signaled it - it'll jhave to wait until this 
    //  thread is blocking on `pseudo_barrier_complete_cond`

    while (!pseudo_barrier_complete_flag) {
        pthread_cond_wait( &pseudo_barrier_complete_cond, &pseudo_barrier_mux);
    }
    pthread_mutex_unlock( &pseudo_barrier_mux);


    // do the work...
}

Of course, the pseudo-code presented should be cleaned up for any real use (including error handling), probably packaging all the supporting condition variables, mutex, and flags into a structure