POSIX Threads, unique execution

2019-09-19 11:15发布

问题:

Is there a way to be sure only 1 thread executes a specific function?

for (i = startnotfirst; i < end; i++) {     

    gl_rand = (float) lrand48();

    fprintf(stderr, "\nTHREADING - #ID: %d - THIS IS MY RAND: %f", *mytid,rand);
    to_open = (rand / (float) INT_MAX) < (points->p[i].cost / z);
    if (to_open) {
        //printf("\nRANDOM: %f \nINT_MAX: %d \npointsDistance(cost): %f \nZ: %f \n\n RESULT: %f < %f\n\n\n",rand ,INT_MAX,points->p[1].cost , z,(rand / (float) INT_MAX),(points->p[i].cost / z));
        fprintf(stderr, "\nTHREADING - #ID: %d - im working...", *mytid);
        t_kcenter++;
        for (int k = start; k < end; k++) {
            float distance = dist(points->p[i], points->p[k], points->dim);
            //If distance is smaller add it to that cluster.
            float new_cost = distance * points->p[k].weight;
            if (new_cost < points->p[k].cost) {
                points->p[k].cost = new_cost;
                points->p[k].assign = i;
            }
        }
    }
}

I want the gl_rand to be executed by only one thread and others know about the new rand value either by changing a global variable or broadcasting.

Also the threads must all have ended their job iteration body before a new number comes out!

Any ideas? Thanks!

回答1:

pthread_cond_wait() and pthread_cond_broadcast()

In conjunction with pthread_mutex_lock() pthread_mutex_unlock(), the pthread_cond_wait() and pthread_cond_broadcast() functions are the key to achieving the required functionality. However, considerable care is required to get the predicate right.

/*
** Objective: N threads cooperate on M cycles or iterations of a task.
** A new random number is needed for each cycle, but all threads must
** use the same random number on each cycle.
** Any thread may evaluate the new random number.
*/

#include <assert.h>
#include <errno.h>
#include <pthread.h>
#include <stdint.h>
#include <stdio.h>
#include <stdlib.h>
#include <time.h>
#include "stderr.h"

#ifndef NUM_THREADS
#define NUM_THREADS 3
#endif
#ifndef NUM_CYCLES
#define NUM_CYCLES 5
#endif

enum { MAX_THREADS = NUM_THREADS };
enum { MAX_CYCLES  = NUM_CYCLES  };

static pthread_mutex_t mtx_waiting = PTHREAD_MUTEX_INITIALIZER;
static pthread_cond_t  cnd_waiting = PTHREAD_COND_INITIALIZER;
static int             num_waiting = 0;
static int             cycle   = -1;

static float gl_rand = 0;
static long  gl_long = 0;

static float next_iteration_random_number(int tid, int iteration)
{
    pthread_mutex_lock(&mtx_waiting);
    assert(cycle == iteration || cycle == iteration - 1);
    num_waiting++;
    printf("-->> TID %d, I = %d (C = %d, W = %d)\n",
           tid, iteration, cycle, num_waiting);
    while (cycle != iteration && num_waiting != MAX_THREADS)
    {
        assert(num_waiting > 0 && num_waiting <= MAX_THREADS);
        printf("-CW- TID %d, I = %d (C = %d, W = %d)\n",
               tid, iteration, cycle, num_waiting);
        pthread_cond_wait(&cnd_waiting, &mtx_waiting);
    }
    assert(cycle == iteration || num_waiting == MAX_THREADS);
    printf("---- TID %d, I = %d (C = %d, W = %d)\n",
           tid, iteration, cycle, num_waiting);

    if (cycle != iteration)
    {
        gl_long = lrand48();
        gl_rand = (float)gl_long;
        num_waiting = 0;
        cycle = iteration;
        printf("---- TID %d generates cycle %d: L = %ld, F = %g\n",
               tid, cycle, gl_long, gl_rand);
        pthread_cond_broadcast(&cnd_waiting);
    }

    printf("<<-- TID %d, I = %d (C = %d, W = %d) L = %ld, F = %g\n",
           tid, iteration, cycle, num_waiting, gl_long, gl_rand);
    pthread_mutex_unlock(&mtx_waiting);
    return gl_rand;
}

static void *thread_function(void *vp)
{
    int tid = (int)(uintptr_t)vp;     // Thuggish!

    for (int i = 0; i < MAX_CYCLES; i++)
    {
        float f = next_iteration_random_number(tid, i);
        printf("TID %d at work: I = %d, F = %g\n", tid, i, f);
        fflush(stdout);
        struct timespec rq;
        rq.tv_sec = 0;
        rq.tv_nsec = (((gl_long & 0xFF) + (0xF * tid))) % 200 * 50000000;
        assert(rq.tv_nsec >= 0 && rq.tv_nsec < 10000000000);
        nanosleep(&rq, 0);
    }

    return 0;
}

int main(int argc, char **argv)
{
    err_setarg0(argv[0]);
    assert(argc == 1);

    pthread_t thread[MAX_THREADS];

    for (int i = 0; i < MAX_THREADS; i++)
    {
        int rc = pthread_create(&thread[i], 0, thread_function, (void *)(uintptr_t)i);
        if (rc != 0)
        {
            errno = rc;
            err_syserr("failed to create thread %d", i);
        }
    }

    for (int i = 0; i < MAX_THREADS; i++)
    {
        void *vp;
        int rc = pthread_join(thread[i], &vp);
        if (rc != 0)
        {
            errno = rc;
            err_syserr("Failed to join TID %d", i);
        }
        printf("TID %d returned %p\n", i, vp);
    }

    return 0;
}

Source at GitHub. Library code in libsoq. The routines starting err_ are declared in header stderr.h and the supporting code is in stderr.c. These greatly simplify error reporting.

The main() function is trivial; it launches 5 threads, and then waits to join 5 threads. The only tricky bit is relaying the thread number as an argument to the thread function. The code converts the integer value into a uintptr_t, then coerces that to a void *; the called function undoes this sequence

The per-thread function is not complex. It collects the thread number from its argument, then enters into a loop to iterate through the required number of cycles. On each iteration, it passes the iteration number (and the thread number) to next_iteration_random_number(), which coordinates the random number generation. The thread prints the data, then uses nanosleep() to sleep for a not easily discernible amount of time that is under 1 second.

The interesting code is in next_iteration_random_number(). First, the thread locks the mutex that controls the shared information. It then increments the number of waiting threads. If the current cycle is not the same as iteration it is dealing with and the number of waiting threads is not equal to the total number of threads, then this thread calls pthread_cond_wait() to sleep until broadcast to. When the thread gets out of the loop (either by waking up after a broadcast, or because it never entered the loop), the thread looks to see if the current cycle is the iteration it is expecting. If not, it must be the last thread to try for this cycle — so it generates the random number and does the housekeeping to ensure that the other threads will know what is happening, and then it uses pthread_cond_broadcast() to signal all the sleeping threads to awaken. They can't actually wake yet; the current thread still holds the mutex. It reports what it is doing and unlocks the mutex, and returns the random number. The other threads detect that the cycle number is the same as the iteration they were waiting for, so they can each print the information, unlock the mutex, and return the number.

The key point is that each thread must know which iteration it is expecting to work on, and must sleep appropriately when that is not yet the current iteration. There are assertions and printing operations aplenty in the code to help understand what's going on.

Wholly separately, since lrand48() returns a long (albeit with a value in the range [0,231), so it is a 32-bit value even when long is a 64-bit type), which means there's a very good chance that it returns values which cannot be represented accurately in a float, so the gl_rand = (float)lrand48(); code snipped from the question is dubious. (The sample code also records the long value in gl_long and uses that on occasion.)

The code compiles cleanly on a Mac running macOS Sierra 10.12.3 with GCC 6.3.0 and compilation options set to fussy:

$ gcc -O3 -g -I../../inc -std=c11 -Wall -Wextra -Werror -Wmissing-prototypes \
>     -Wstrict-prototypes -Wold-style-definition pthrd37.c -o pthrd37 \
>     -L../../lib -lsoq 
$

Sample run

Program configured with 3 threads and 5 cycles:

-->> TID 0, I = 0 (C = -1, W = 1)
-CW- TID 0, I = 0 (C = -1, W = 1)
-->> TID 2, I = 0 (C = -1, W = 2)
-CW- TID 2, I = 0 (C = -1, W = 2)
-->> TID 1, I = 0 (C = -1, W = 3)
---- TID 1, I = 0 (C = -1, W = 3)
---- TID 1 generates cycle 0: L = 851401618, F = 8.51402e+08
<<-- TID 1, I = 0 (C = 0, W = 0) L = 851401618, F = 8.51402e+08
TID 1 at work: I = 0, F = 8.51402e+08
---- TID 0, I = 0 (C = 0, W = 0)
<<-- TID 0, I = 0 (C = 0, W = 0) L = 851401618, F = 8.51402e+08
TID 0 at work: I = 0, F = 8.51402e+08
---- TID 2, I = 0 (C = 0, W = 0)
<<-- TID 2, I = 0 (C = 0, W = 0) L = 851401618, F = 8.51402e+08
TID 2 at work: I = 0, F = 8.51402e+08
-->> TID 1, I = 1 (C = 0, W = 1)
-CW- TID 1, I = 1 (C = 0, W = 1)
-->> TID 0, I = 1 (C = 0, W = 2)
-CW- TID 0, I = 1 (C = 0, W = 2)
-->> TID 2, I = 1 (C = 0, W = 3)
---- TID 2, I = 1 (C = 0, W = 3)
---- TID 2 generates cycle 1: L = 1804928587, F = 1.80493e+09
<<-- TID 2, I = 1 (C = 1, W = 0) L = 1804928587, F = 1.80493e+09
TID 2 at work: I = 1, F = 1.80493e+09
---- TID 1, I = 1 (C = 1, W = 0)
<<-- TID 1, I = 1 (C = 1, W = 0) L = 1804928587, F = 1.80493e+09
TID 1 at work: I = 1, F = 1.80493e+09
---- TID 0, I = 1 (C = 1, W = 0)
<<-- TID 0, I = 1 (C = 1, W = 0) L = 1804928587, F = 1.80493e+09
TID 0 at work: I = 1, F = 1.80493e+09
-->> TID 2, I = 2 (C = 1, W = 1)
-CW- TID 2, I = 2 (C = 1, W = 1)
-->> TID 1, I = 2 (C = 1, W = 2)
-CW- TID 1, I = 2 (C = 1, W = 2)
-->> TID 0, I = 2 (C = 1, W = 3)
---- TID 0, I = 2 (C = 1, W = 3)
---- TID 0 generates cycle 2: L = 758783491, F = 7.58783e+08
<<-- TID 0, I = 2 (C = 2, W = 0) L = 758783491, F = 7.58783e+08
TID 0 at work: I = 2, F = 7.58783e+08
---- TID 2, I = 2 (C = 2, W = 0)
<<-- TID 2, I = 2 (C = 2, W = 0) L = 758783491, F = 7.58783e+08
TID 2 at work: I = 2, F = 7.58783e+08
---- TID 1, I = 2 (C = 2, W = 0)
<<-- TID 1, I = 2 (C = 2, W = 0) L = 758783491, F = 7.58783e+08
TID 1 at work: I = 2, F = 7.58783e+08
-->> TID 0, I = 3 (C = 2, W = 1)
-CW- TID 0, I = 3 (C = 2, W = 1)
-->> TID 2, I = 3 (C = 2, W = 2)
-CW- TID 2, I = 3 (C = 2, W = 2)
-->> TID 1, I = 3 (C = 2, W = 3)
---- TID 1, I = 3 (C = 2, W = 3)
---- TID 1 generates cycle 3: L = 959030623, F = 9.59031e+08
<<-- TID 1, I = 3 (C = 3, W = 0) L = 959030623, F = 9.59031e+08
TID 1 at work: I = 3, F = 9.59031e+08
-->> TID 1, I = 4 (C = 3, W = 1)
-CW- TID 1, I = 4 (C = 3, W = 1)
---- TID 0, I = 3 (C = 3, W = 1)
<<-- TID 0, I = 3 (C = 3, W = 1) L = 959030623, F = 9.59031e+08
TID 0 at work: I = 3, F = 9.59031e+08
---- TID 2, I = 3 (C = 3, W = 1)
<<-- TID 2, I = 3 (C = 3, W = 1) L = 959030623, F = 9.59031e+08
TID 2 at work: I = 3, F = 9.59031e+08
-->> TID 0, I = 4 (C = 3, W = 2)
-CW- TID 0, I = 4 (C = 3, W = 2)
-->> TID 2, I = 4 (C = 3, W = 3)
---- TID 2, I = 4 (C = 3, W = 3)
---- TID 2 generates cycle 4: L = 684387517, F = 6.84388e+08
<<-- TID 2, I = 4 (C = 4, W = 0) L = 684387517, F = 6.84388e+08
TID 2 at work: I = 4, F = 6.84388e+08
---- TID 1, I = 4 (C = 4, W = 0)
<<-- TID 1, I = 4 (C = 4, W = 0) L = 684387517, F = 6.84388e+08
TID 1 at work: I = 4, F = 6.84388e+08
---- TID 0, I = 4 (C = 4, W = 0)
<<-- TID 0, I = 4 (C = 4, W = 0) L = 684387517, F = 6.84388e+08
TID 0 at work: I = 4, F = 6.84388e+08
TID 0 returned 0x0
TID 1 returned 0x0
TID 2 returned 0x0

Notice that each thread happens to generate the random number at least once. That's not something that can be guaranteed, but it shows that no one thread is privileged.

pthread_once()

A first version of this answer mentioned and illustrated the pthread_once() call. This was not what was actually needed, but it could be useful in other contexts.

Assuming you have a sufficiently POSIX-compliant system, if you want only one thread to do something, but it does not matter which thread does it, then I believe you're looking for pthread_once().

#include <pthread.h>
#include <stdlib.h>

static pthread_once_t once_only = PTHREAD_ONCE_INIT;

static float gl_rand = 0;

static void pt_once(void)
{
    gl_rand = (float)lrand48();
}

void *thread_function(void *vp)
{
     pthread_once(&once_only, pt_once);
     …rest of the code…
     return 0;
}