read same structure on all threads

2019-06-06 01:41发布

问题:

I want all threads to read from the same structure. I did it in the past by adding threads inside the loop which reads from the structure but this time I need the structure to be opened inside void "dowork" as my example shows.

I have the following code:

struct word_list {
    char word[20];
    struct word_list * next;
};

struct word_list * first_word = NULL;
//other function which loads into struct is missing cause it's not relevant


//in main()
pthread_t thread_id[MAX_THREADS];
int max_thread = 10;
for(t = 0 ; t < max_thread; t++)
{
    pthread_mutex_lock(&thrd_list);
    arg_struct *args = calloc(1, sizeof(*args));
    args->file = file;
    args->t = t;
    args->e = ex;
    pthread_mutex_unlock(&thrd_list);
    if(pthread_create(&thread_id[t],NULL,dowork,args) != 0)
    {
        t--;
        fprintf(stderr,RED "\nError in creating thread\n" NONE);
    }
}

for(t = 0 ; t < max_thread; t++)
    if(pthread_join(thread_id[t],NULL) != 0)
    {
        fprintf(stderr,RED "\nError in joining thread\n" NONE);
    }




void *dowork(void *arguments)
{
    struct word_list * curr_word = first_word;
    char myword[20];
    while( curr_word != NULL )
    {
        pthread_mutex_lock(&thrd_list);
        strncpy(myword,curr_word->word,sizeof(myword) - 1);
        pthread_mutex_unlock(&thrd_list);

        //some irrelevant code is missing

        pthread_mutex_lock(&thrd_list);
        curr_word = curr_word->next;
        pthread_mutex_unlock(&thrd_list);
    }

}

How can I read different elements from the same structure in all threads?

回答1:

If I understand your requirements now (and I think I finally do), you need to treat your word list as a work queue. To do that requires a notification mechanism that allows the "pusher" of items into the queue to inform the "pullers" that new data is available. Such a system does exist in pthreads: the marriage of a condition variable, a mutex, and the predicate(s) they manage for control flow.

This is one example of how to use it. I've tried to document what is going on at each step for you, and hopefully you will understand.

#include <stdio.h>
#include <stdlib.h>
#include <string.h>
#include <pthread.h>

// defined the number of threads in our queue and the number
//  of test items for this demonstration.
#define MAX_THREADS  16
#define MAX_ITEMS    128*1024

typedef struct word_list
{
    char word[20];
    struct word_list * next;

} word_list;

// predicate values for the word list
struct word_list * first_word = NULL;   // current word.
int word_shutdown = 0;                  // shutdown state

// used for protecting our list.
pthread_mutex_t wq_mtx = PTHREAD_MUTEX_INITIALIZER;
pthread_cond_t wq_cv = PTHREAD_COND_INITIALIZER;

// worker proc
void *dowork(void*);

int main()
{
    pthread_t thread_id[MAX_THREADS];
    int i=0;

    // start thread pool
    for(i=0; i < MAX_THREADS; ++i)
        pthread_create(thread_id+i, NULL, dowork, NULL);

    // add MAX_ITEMS more entries, we need to latch since the
    //  work threads are actively processing the queue as we go.
    for (i=0;i<MAX_ITEMS;++i)
    {
        word_list *node = malloc(sizeof(*node));
        sprintf(node->word, "Word-%d", i);

        // latch before updating the queue head.
        pthread_mutex_lock(&wq_mtx);
        node->next = first_word;
        first_word = node;

        // no longer need the latch. unlock and inform any
        // potential waiter.
        pthread_mutex_unlock(&wq_mtx);
        pthread_cond_signal(&wq_cv);
    }

    // wait for the condition that the queue is empty
    pthread_mutex_lock(&wq_mtx);
    while (first_word != NULL)
        pthread_cond_wait(&wq_cv, &wq_mtx);
    pthread_mutex_unlock(&wq_mtx);

    // queue is empty, but threads are all still there waiting. So
    //  do it again, just to proves the pool is still intact.
    for (i=0;i<MAX_ITEMS;++i)
    {
        word_list *node = malloc(sizeof(*node));
        sprintf(node->word, "Word-%d", i);

        // latch before updating the queue head.
        pthread_mutex_lock(&wq_mtx);
        node->next = first_word;
        first_word = node;

        // no longer need the latch. unlock and inform any
        // potential waiter.
        pthread_mutex_unlock(&wq_mtx);
        pthread_cond_signal(&wq_cv);
    }

    // again wait for the condition that the queue is empty
    pthread_mutex_lock(&wq_mtx);
    while (first_word != NULL)
        pthread_cond_wait(&wq_cv, &wq_mtx);

    // queue is empty, and we're not adding anything else. latch
    //  the mutex, set the shutdown flag, and tell all the threads.
    //  they need to terminate.
    word_shutdown = 1;
    pthread_mutex_unlock(&wq_mtx);
    pthread_cond_broadcast(&wq_cv);

    for (i=0;i<MAX_THREADS; ++i)
        pthread_join(thread_id[i], NULL);

    return EXIT_SUCCESS;
}


// the work crew will start by locking the mutex, then entering the
//  work loop, looking for entries or a shutdown state
void *dowork(void *arguments)
{
    int n_processed = 0;
    while (1)
    {
        pthread_mutex_lock(&wq_mtx);
        while (first_word == NULL && word_shutdown == 0)
            pthread_cond_wait(&wq_cv, &wq_mtx);

        // we own the mutex, and thus current access to the predicate
        //  values it protects.
        if (first_word != NULL)
        {
            // pull the item off the queue. once we do that we own the
            //  item, so we can unlatch and let another waiter know there
            //  may be more data on the queue.
            word_list *p = first_word;
            first_word = p->next;
            if (p->next)
                pthread_cond_signal(&wq_cv);
            pthread_mutex_unlock(&wq_mtx);

            //
            // TODO: process item here.
            //
            ++n_processed;
            free(p);
        }
        else if (word_shutdown != 0)
            break;
    }

    // we still own the mutex. report on how many items we received, then
    //  one more signal to let someone (anyone, actually) know we're done.
    pthread_t self = pthread_self();
    printf("%p : processed %d items.\n",self, n_processed);
    pthread_mutex_unlock(&wq_mtx);
    pthread_cond_signal(&wq_cv);
    return NULL;
}

Sample Output: MAX_THREADS = 4 (your output will vary)

0x100387000 : processed 64909 items.
0x100304000 : processed 64966 items.
0x1000b5000 : processed 64275 items.
0x100281000 : processed 67994 items.

Sample Output: MAX_THREADS = 8

0x100304000 : processed 31595 items.
0x1000b5000 : processed 33663 items.
0x100593000 : processed 34298 items.
0x10040a000 : processed 32304 items.
0x10048d000 : processed 32406 items.
0x100387000 : processed 31878 items.
0x100281000 : processed 32317 items.
0x100510000 : processed 33683 items.

Sample Output: MAX_THREADS = 16

0x10079f000 : processed 17239 items.
0x101081000 : processed 16530 items.
0x101104000 : processed 16662 items.
0x100699000 : processed 16562 items.
0x10040a000 : processed 16672 items.
0x100593000 : processed 15158 items.
0x10120a000 : processed 17365 items.
0x101187000 : processed 14184 items.
0x100387000 : processed 16332 items.
0x100616000 : processed 16497 items.
0x100281000 : processed 16632 items.
0x100304000 : processed 16222 items.
0x100510000 : processed 17188 items.
0x10048d000 : processed 15367 items.
0x1000b5000 : processed 16912 items.
0x10071c000 : processed 16622 items.

And just because we can, with full global optimization enabled

Sample Output: MAX_THREADS = 32, MAX_ITEMS = 4194304

0x109c58000 : processed 260000 items.
0x109634000 : processed 263433 items.
0x10973a000 : processed 262125 items.
0x10921c000 : processed 261201 items.
0x108d81000 : processed 262325 items.
0x109a4c000 : processed 262318 items.
0x108f8d000 : processed 263107 items.
0x109010000 : processed 261382 items.
0x109946000 : processed 262299 items.
0x109199000 : processed 261930 items.
0x10929f000 : processed 263506 items.
0x109093000 : processed 262362 items.
0x108e87000 : processed 262069 items.
0x108e04000 : processed 261890 items.
0x109acf000 : processed 261875 items.
0x1097bd000 : processed 262040 items.
0x109840000 : processed 261686 items.
0x1093a5000 : processed 262547 items.
0x109b52000 : processed 261980 items.
0x109428000 : processed 264259 items.
0x108f0a000 : processed 261620 items.
0x1095b1000 : processed 263062 items.
0x1094ab000 : processed 261811 items.
0x1099c9000 : processed 262709 items.
0x109116000 : processed 261628 items.
0x109bd5000 : processed 260905 items.
0x10952e000 : processed 262741 items.
0x1098c3000 : processed 260608 items.
0x109322000 : processed 261970 items.
0x1000b8000 : processed 262061 items.
0x100781000 : processed 262669 items.
0x1096b7000 : processed 262490 items.

Hmmm. and I didn't use volatile in any of this. Must be time to buy a lotto ticket.

Anyway, I suggest doing some research on pthreads, particularly on mutex and condition variable control and their interactions. I hope this helps you out.



回答2:

So you want to process lots of data by splitting the work across multiple threads. Your solution is not very efficient because your threads are going to be fighting a lot of who owns the mutex and you can't be sure the work is evenly spread across all your threads. So, for example, threads 0 and 1 could be getting all the work as they get first access to the mutex and all the other threads are just idle all the time.

If you want to improve the performance, you need to do the following:-

  • Make all the threads independent of each other, i.e. remove synchronised data
  • Ensure memory coherency between threads, i.e. make sure the data for item n+1 is next to data for item n. This helps the CPU access memory better. Jumping around RAM a lot will generate lots of cache misses which kill performance.

So, in your program, instead of a single linked list that is shared across all threads, have a linked list for each thread:-

typedef struct _word_list
{
  //data
  struct _word_list *next;
} word_list;

static const int num_threads = 4; // actually, setting this to number of CPUs at run time would be better

word_list 
  *lists [num_threads] = {0};

void ReadWords ()
{
  word_list
    **current [num_threads];

  for (int i = 0 ; i < num_threads ; ++i)
  {
    current = &lists [i];
  }

  int destination = 0;

  while (read some valid input)
  {
    *current [destination] = malloc (sizeof (word_list));
    // set data 
    current [destination] = &current [destination]->next;

    destination = (destination + 1) % num_threads;
  }

  // data has now been read and stored into a series of linked lists, each list having
  // the same number of items (or one less)
}


void create_threads ()
{
   for (int i = 0 ; i < num_threads ; ++i)
   {
      // create thread, and pass it the value of lists [i]
   }
}

void do_work (...)
{
   for (word_list *item = passed in parameter ; item ; item = item->next)
   {
     process data
   }
}

In this program (just made it up, haven't checked it) I create four linked lists and evenly assign data to the lists. Then I create the threads and give each thread one of the linked lists. Each thread then processes its own linked list (they are separate lists).

Each thread can now run at full speed and never has to wait on a mutex to get data. Memory accesses are reasonable but dependent on the allocator to a large extent. Using an array rather than a linked list would improve this, but you'd need to know the number of data items before allocating the arrays which might not be possible.



回答3:

Let me see if i understand this correctly?

  • struct word_list describes some kind of linked list
  • You want to spread out the elements of that list among the threads.

If this is what you want then I would just pop the elements one by one from the list and write the pointer to the rest back:

volatile struct word_list * first_word = NULL; // important to make it volatile

void *dowork(void *arguments)
{
    struct word_list * curr_word;
    char myword[20];
    do {
        // gain exclusive access to the control structures
        pthread_mutex_lock(&thrd_list);

        // get the next element
        curr_word = first_word;
        if (curr_word == NULL) {
            pthread_mutex_unlock(&thrd_list);
            break;
        }

        // notify the remaining threads what the next element is
        first_word = curr_word->next;

        pthread_mutex_unlock(&thrd_list);

        // do whatever you have to do

    } while (1);

}

Make an additional global volatile struct word_list * next_word if you don't want to modify first_word. Make sure to make it volatile, else the compiler may perform optimisations that lead to weird results.