Most efficient way to process a queue with threads

2019-03-12 16:13发布

问题:

I have a queue onto which pending fourier transform requests (comparatively time consuming operations) are placed - we could get thousands of transform requests per second in some cases, so its gotta be quick.

I'm upgrading the old code to use .net 4, as well as porting to TPL. I'm wondering what the most efficient (fastest throughput) way to handle this queue looks like. I'd like to use all cores available.

Currently I am experimenting with a BlockingCollection. I create a queue handler class that spawns 4 tasks, which block on the BlockingCollection and wait for incoming work. They then process that pending transform. Code:

public class IncomingPacketQueue : IDisposable
    {
        BlockingCollection<IncomingPacket> _packetQ = new BlockingCollection<IncomingPacket>();

        public IncomingPacketQueue(int workerCount)
        {
            for (int i = 0; i < workerCount; i++)
            {
                Task.Factory.StartNew(Consume);
            }
        }

        public void EnqueueSweep(IncomingPacket incoming)
        {
            _packetQ.Add(incoming);
        }

        private void Consume()
        {
            foreach (var sweep in _packetQ.GetConsumingEnumerable())
            {
                //do stuff
                var worker = new IfftWorker();
                Trace.WriteLine("  Thread {0} picking up a pending ifft".With(Thread.CurrentThread.ManagedThreadId));
                worker.DoIfft(sweep);                

            }
        }

        public int QueueCount
        {
            get
            {
                return _packetQ.Count;
            }
        }

    #region IDisposable Members

    public void Dispose()
    {
        _packetQ.CompleteAdding();
    }

    #endregion
    }

Does this look like a good solution? It seems to max out all cores - although I'm currently unsure how many workers I should spawn in my constructor.

回答1:

That looks reasonable. I've found BlockingCollection to be quite fast. I use it to process tens of thousands of requests per second.

If your application is processor bound, then you probably don't want to create more workers than you have cores. Certainly you don't want to create a lot more workers than cores. On a quad core machine, if you expect most of the time to be spent doing the FFTs, then four workers will eat all the CPU. More workers just means more that you have thread context switches to deal with. The TPL will typically balance that for you, but there's no reason to create, say, 100 workers when you can't handle more than a handful.

I would suggest that you run tests with 3, 4, 5, 6, 7, and 8 workers. See which one gives you the best throughput.



回答2:

I agree with Jim. Your approach looks really good. You are not going to get much better this. I am not an FFT expert, but I am assuming these operations are nearly 100% CPU bound. If that is indeed the case then a good first guess at the number of workers would be a direct 1-to-1 correlation with the number of cores in the machine. You can use Environment.ProcessorCount to get this value. You could experiment with a multiplier of say 2x or 4x, but again, if these operations are CPU bound then anything higher than 1x might just cause more overhead. Using Environment.ProcessorCount would make your code more portable.

Another suggestion...let the TPL know that these are dedicated threads. You can do this by specifying the LongRunning option.

public IncomingPacketQueue()
{
    for (int i = 0; i < Environment.ProcessorCount; i++)
    {
        Task.Factory.StartNew(Consume, TaskCreationOptions.LongRunning);
    }
}


回答3:

Why not use Parallel.ForEach and let TPL handle the number of threads created.

        Parallel.ForEach(BlockingCollectionExtensions.GetConsumingPartitioneenter(_packetQ),
                         sweep => {
                           //do stuff
                           var worker = new IfftWorker();
                           Trace.WriteLine("  Thread {0} picking up a pending ifft".With(Thread.CurrentThread.ManagedThreadId));
                           worker.DoIfft(sweep);                

                         });

(the GetConsumingPartitioner is part of the ParallelExtensionsExtras)



回答4:

Make the number of workers configruable. Also too many workers and it will get slower (as indicated by another poster), so you need to find the sweet spot. A configurable value would allow test runs to find the optimal value or would allow your program to be adaptable for different types of hardware. YOu could certainly place this value in App.Config and read it on startup.



回答5:

You could also try using PLINQ to parallelize the processing to see how it compares to the approach you're currently using. It has some tricks up its sleeve that can make it very efficient under certain circumstances.

_packetQ.GetConsumingEnumerable().AsParallel().ForAll(
    sweep => new IfftWorker().DoIfft(sweep));