Thread Wait For Parent

2020-02-06 03:33发布

I am implementing a simple thread pool mechanism for my ubuntu server (for my multi-client anonymous chat program), and I need to make my worker threads sleep until a job (in the form of a function pointer and parameter) needs to be performed.

My current system is going out the window. I'm(the worker thread is) asking the manager if a job is available, and if there isn't sleep for 5ms. If there is, add the job to the working queue and run through the function. Wretched waste of cycles.

What I'd like to do is make a simple event-like system. I'm thinking about having a vector of mutexes (one for each worker) and have the handle to the mutex passed in as a parameter at creation. Then in my manager class (which holds and hands out jobs), whenever a thread is created, lock the mutex. When a job needs to be performed unlock a the next mutex in line, wait for it to be locked and unlocked, and relock it. However I'm wondering if there's a much better means to this end.


tldr; So my question is this. What is the most efficient, effective, and safest way to make a thread wait for a job from a managing class? Is polling a technique I should even consider (more than 1000 clients at a time), is mutex locking decent? Or are there other techniques?

5条回答
女痞
2楼-- · 2020-02-06 03:45

The usual way this is implemented is to have a queue queue of outstanding work, a mutex mutex protecting the queue, and a wait condition queue_not_empty. Then, each worker thread does the following (using pseudo-api):

while (true) {
    Work * work = 0;
    mutex.lock();
    while ( queue.empty() )
       if ( !queue_not_empty.wait( &mutex, timeout ) )
           return; // timeout - exit the worker thread
    work = queue.front();
    queue.pop_front();
    mutex.unlock();
    work->perform();
}

The wait( &mutex, timeout ) call blocks until either the wait condition is signaled, or the call times out. The mutex passed is atomically unlocked inside wait(), and locked again before returning from the call, to provide a consistent view of the queue to all participants. timeout would be chosen rather large (seconds), and would lead to the thread exiting (the thread pool would start new ones if more work came in).

Meanwhile, the thread pool's work insertion function does this:

Work * work = ...;
mutex.lock();
queue.push_back( work );
if ( worker.empty() )
    start_a_new_worker();
queue_not_empty.wake_one();
mutex.unlock();
查看更多
再贱就再见
3楼-- · 2020-02-06 03:45

Since a network chat program is presumably I/O-bound rather than CPU-bound, you don't really need threads. You can handle all your I/O in a single thread using a facility such as Boost.Asio or the GLib main loop. These are portable abstractions over platform-specific functions that allow a program to block waiting for activity on any of a (potentially large) set of open files or sockets, and then wake up and respond promptly when activity occurs.

查看更多
我只想做你的唯一
4楼-- · 2020-02-06 03:48

What you need is the condition variable.
All the worker threads call wait() which will suspend them.

The parent thread then puts a work item on a queue and calls signal on the condition variable. This will wake one thread that is sleeping. It can remove the job from the queue execute the job then call wait on the condition variable to go back to sleep.

Try:

#include <pthread.h>
#include <memory>
#include <list>

// Use RAII to do the lock/unlock
struct MutexLock
{
    MutexLock(pthread_mutex_t& m) : mutex(m)    { pthread_mutex_lock(&mutex); }
    ~MutexLock()                                { pthread_mutex_unlock(&mutex); }
    private:
        pthread_mutex_t&    mutex;
};

// The base class of all work we want to do.
struct Job
{
    virtual void doWork()  = 0;
};

// pthreads is a C library the call back must be a C function.
extern "C" void* threadPoolThreadStart(void*);

// The very basre minimal part of a thread pool
// It does not create the workers. You need to create the work threads
// then make them call workerStart(). I leave that as an exercise for you.
class ThreadPool
{

    public:
         ThreadPool(unsigned int threadCount=1);
        ~ThreadPool();

        void addWork(std::auto_ptr<Job> job);
    private:

        friend void* threadPoolThreadStart(void*);
        void workerStart();

        std::auto_ptr<Job>  getJob();

        bool                finished;   // Threads will re-wait while this is true.
        pthread_mutex_t     mutex;      // A lock so that we can sequence accesses.
        pthread_cond_t      cond;       // The condition variable that is used to hold worker threads.
        std::list<Job*>     workQueue;  // A queue of jobs.
        std::vector<pthread_t>threads;
};

// Create the thread pool
ThreadPool::ThreadPool(int unsigned threadCount)
    : finished(false)
    , threads(threadCount)
{
    // If we fail creating either pthread object than throw a fit.
    if (pthread_mutex_init(&mutex, NULL) != 0)
    {   throw int(1);
    }

    if (pthread_cond_init(&cond, NULL) != 0)
    {
        pthread_mutex_destroy(&mutex);
        throw int(2);
    }
    for(unsigned int loop=0; loop < threadCount;++loop)
    {
       if (pthread_create(threads[loop], NULL, threadPoolThreadStart, this) != 0)
       {
            // One thread failed: clean up
            for(unsigned int kill = loop -1; kill < loop /*unsigned will wrap*/;--kill)
            {
                pthread_kill(threads[kill], 9);
            }
            throw int(3);
       }
    }
}

// Cleanup any left overs.
// Note. This does not deal with worker threads.
//       You need to add a method to flush all worker threads
//       out of this pobject before you let the destructor destroy it.
ThreadPool::~ThreadPool()
{
    finished = true;
    for(std::vector<pthread_t>::iterator loop = threads.begin();loop != threads.end(); ++loop)
    {
        // Send enough signals to free all threads.
        pthread_cond_signal(&cond);
    }
    for(std::vector<pthread_t>::iterator loop = threads.begin();loop != threads.end(); ++loop)
    {
        // Wait for all threads to exit (they will as finished is true and
        //                               we sent enough signals to make sure
        //                               they are running).
        void*  result;
        pthread_join(*loop, &result);
    }
    // Destroy the pthread objects.
    pthread_cond_destroy(&cond);
    pthread_mutex_destroy(&mutex);

    // Delete all re-maining jobs.
    // Notice how we took ownership of the jobs.
    for(std::list<Job*>::const_iterator loop = workQueue.begin(); loop != workQueue.end();++loop)
    {
        delete *loop;
    }
}

// Add a new job to the queue
// Signal the condition variable. This will flush a waiting worker
// otherwise the job will wait for a worker to finish processing its current job.
void ThreadPool::addWork(std::auto_ptr<Job> job)
{
    MutexLock  lock(mutex);

    workQueue.push_back(job.release());
    pthread_cond_signal(&cond);
}

// Start a thread.
// Make sure no exceptions escape as that is bad.
void* threadPoolThreadStart(void* data)
{
    ThreadPool* pool = reinterpret_cast<ThreadPool*>(workerStart);
    try
    {
        pool->workerStart();
    }
    catch(...){}
    return NULL;
}

// This is the main worker loop.
void ThreadPool::workerStart()
{
    while(!finished)
    {
        std::auto_ptr<Job>    job    = getJob();
        if (job.get() != NULL)
        {
            job->doWork();
        }
    }
}

// The workers come here to get a job.
// If there are non in the queue they are suspended waiting on cond
// until a new job is added above.
std::auto_ptr<Job> ThreadPool::getJob()
{
    MutexLock  lock(mutex);

    while((workQueue.empty()) && (!finished))
    {
        pthread_cond_wait(&cond, &mutex);
        // The wait releases the mutex lock and suspends the thread (until a signal).
        // When a thread wakes up it is help until it can acquire the mutex so when we
        // get here the mutex is again locked.
        //
        // Note: You must use while() here. This is because of the situation.
        //   Two workers:  Worker A processing job A.
        //                 Worker B suspended on condition variable.
        //   Parent adds a new job and calls signal.
        //   This wakes up thread B. But it is possible for Worker A to finish its
        //   work and lock the mutex before the Worker B is released from the above call.
        //
        //   If that happens then Worker A will see that the queue is not empty
        //   and grab the work item in the queue and start processing. Worker B will
        //   then lock the mutext and proceed here. If the above is not a while then
        //   it would try and remove an item from an empty queue. With a while it sees
        //   that the queue is empty and re-suspends on the condition variable above.
    }
    std::auto_ptr<Job>  result;
    if (!finished)
    {    result.reset(workQueue.front());
         workQueue.pop_front();
    }

    return result;
}
查看更多
爱情/是我丢掉的垃圾
5楼-- · 2020-02-06 03:57

The easiest way to do this is semaphores. This is how a semaphore works:

A semaphore is basically a variable that takes null/positive values. Processes can interact with it in two ways: increase, or decrease the semaphore.

Increasing the semaphore adds 1 to this magical variable, and that's about it. It's in decreasing the count that things get interesting: if the count reaches zero and a process tries to lower it again, since it can't take negative values, it will block until the variable rises.

If multiple processes block are waiting to decrease the semaphore value, only one is woken up for each unit the count is increased.

This makes very easy to create a worker/task system: your manager process queues tasks and increases the value of the semaphore to match remaining items, and your worker processes try to decrease the count and acquire a task constantly. When no tasks are available, they'll block, and consume no cpu-time. When one appears, only one of the dormant processes will awake. Insta-sync magic.

Unfortunately, at least in the Unix world, the semaphore API is not very friendly, since for some reason it deals with arrays of sempahores rather than individual ones. But, you're a simple wrapper away from a nice interface!

Cheers!

查看更多
混吃等死
6楼-- · 2020-02-06 04:11

Classical producer-consumer synchronization with multiple consumers (the worker threads consume work requests). The well-known technique is to have a semaphore, each worker thread does down() and each time you have a work request, do up(). Than pick the request from mutex-locked workqueue. Since one up() will only wake up one down(), there will actually be minimal contention on the mutex.

Alternatively you can do the same with conditional variable, doing wait in each thread and wake up one when you have job. The queue itself still locked with mutex (condvar requires one anyway).

Last I am not completely sure, but I actually think you can actually use a pipe as the queue including all synchronization (the worker threads simply trying to "read(sizeof(request))"). A bit hacky, but leads to fewer context switches.

查看更多
登录 后发表回答