How to reuse threads in .NET 3.5

2019-01-22 10:18发布

问题:

I have a subroutine that processes large blocks of information. In order to make use of the entire CPU, it divides the work up into separate threads. After all threads have completed, it finishes. I read that creating and destroying threads uses lots of overhead, so I tried using the threadpool, but that actually runs slower than creating my own threads. How can I create my own threads when the program runs and then keep reusing them? I've seen some people say it can't be done, but the threadpool does it so it must be possible, right?

Here is part of the code that launches new threads / uses the threadpool:

//initialization for threads
Thread[] AltThread = null;
if (NumThreads > 1)
    AltThread = new Thread[pub.NumThreads - 1];

do
{
    if (NumThreads > 1)
    {   //split the matrix up into NumThreads number of even-sized blocks and execute on separate threads
        int ThreadWidth = DataWidth / NumThreads;
        if (UseThreadPool) //use threadpool threads
        {
            for (int i = 0; i < NumThreads - 1; i++)
            {
                ThreadPool.QueueUserWorkItem(ComputePartialDataOnThread, 
                    new object[] { AltEngine[i], ThreadWidth * (i + 1), ThreadWidth * (i + 2) });
            }
            //get number of threads available after queue
            System.Threading.Thread.Sleep(0);
            int StartThreads, empty, EndThreads;
            ThreadPool.GetAvailableThreads(out StartThreads, out empty);
            ComputePartialData(ThisEngine, 0, ThreadWidth);

            //wait for all threads to finish
            do
            {
                ThreadPool.GetAvailableThreads(out EndThreads, out empty);
                System.Threading.Thread.Sleep(1);
            } while (StartThreads - EndThreads > 0);
        }
        else //create new threads each time (can we reuse these?)
        {
            for (int i = 0; i < NumThreads - 1; i++)
            {
                AltThread[i] = new Thread(ComputePartialDataOnThread);
                AltThread[i].Start(new object[] { AltEngine[i], ThreadWidth * (i + 1), ThreadWidth * (i + 2) });
            }
            ComputePartialData(ThisEngine, 0, ThreadWidth);

            //wait for all threads to finish
            foreach (Thread t in AltThread)
                t.Join(1000);
            foreach (Thread t in AltThread)
                if (t.IsAlive) t.Abort();
        }
    }
}

ComputePartialDataOnThread simply unpackages the information and calls ComputePartialData. The data that will be processed is shared among the threads (they don't try to read/write the same locations). AltEngine[] is a separate computation engine for each thread.

The operation runs about 10-20% using the threadpool.

回答1:

This sounds like a fairly common requirement which can be solved by a multi-threaded producer-consumer queue. The threads are kept 'alive' and are signaled to do work when new work is added to the queue. The work is represented by a delegate (in your case ComputePartialDataOnThread) and the data passed to the delegate is what is queued (in your case the params to ComputePartialDataOnThread). The useful feature is that the implementation of managing worker threads and the actual algorithms are separate. Here is the p-c queue:

public class SuperQueue<T> : IDisposable where T : class
{
    readonly object _locker = new object();
    readonly List<Thread> _workers;
    readonly Queue<T> _taskQueue = new Queue<T>();
    readonly Action<T> _dequeueAction;

    /// <summary>
    /// Initializes a new instance of the <see cref="SuperQueue{T}"/> class.
    /// </summary>
    /// <param name="workerCount">The worker count.</param>
    /// <param name="dequeueAction">The dequeue action.</param>
    public SuperQueue(int workerCount, Action<T> dequeueAction)
    {
        _dequeueAction = dequeueAction;
        _workers = new List<Thread>(workerCount);

        // Create and start a separate thread for each worker
        for (int i = 0; i < workerCount; i++)
        {
            Thread t = new Thread(Consume) { IsBackground = true, Name = string.Format("SuperQueue worker {0}",i )};
            _workers.Add(t);
            t.Start();
        }
    }

    /// <summary>
    /// Enqueues the task.
    /// </summary>
    /// <param name="task">The task.</param>
    public void EnqueueTask(T task)
    {
        lock (_locker)
        {
            _taskQueue.Enqueue(task);
            Monitor.PulseAll(_locker);
        }
    }

    /// <summary>
    /// Consumes this instance.
    /// </summary>
    void Consume()
    {
        while (true)
        {
            T item;
            lock (_locker)
            {
                while (_taskQueue.Count == 0) Monitor.Wait(_locker);
                item = _taskQueue.Dequeue();
            }
            if (item == null) return;

            // run actual method
            _dequeueAction(item);
        }
    }

    /// <summary>
    /// Performs application-defined tasks associated with freeing, releasing, or resetting unmanaged resources.
    /// </summary>
    public void Dispose()
    {
        // Enqueue one null task per worker to make each exit.
        _workers.ForEach(thread => EnqueueTask(null));

        _workers.ForEach(thread => thread.Join());

    }
}

As previous posters have said, there are many built in structures (look at TPL ), which use the Threadpool, which you may want to look at before implementing your own queue.



回答2:

So the usual way one would do this is to have each thread's entrypoint essentially do something similar to (this is just an algorithm, not C# code, sorry):

  1. Check to see if you have work to do
  2. Do work if found
  3. Wait on a signal

On the other side whenever you have more work for your thread add it to the queue of work to do and then your thread in essence is being reused. This is pretty similar to how one would implement a thread pool by themselves (if you are in the runtime you could do some other things to help you out, but it's not a super big deal).



回答3:

Here's a thread that talks about this very thing: A custom thread-pool/queue class.