I have a queue onto which pending fourier transform requests (comparatively time consuming operations) are placed - we could get thousands of transform requests per second in some cases, so its gotta be quick.
I'm upgrading the old code to use .net 4, as well as porting to TPL. I'm wondering what the most efficient (fastest throughput) way to handle this queue looks like. I'd like to use all cores available.
Currently I am experimenting with a BlockingCollection. I create a queue handler class that spawns 4 tasks, which block on the BlockingCollection and wait for incoming work. They then process that pending transform. Code:
public class IncomingPacketQueue : IDisposable
{
BlockingCollection<IncomingPacket> _packetQ = new BlockingCollection<IncomingPacket>();
public IncomingPacketQueue(int workerCount)
{
for (int i = 0; i < workerCount; i++)
{
Task.Factory.StartNew(Consume);
}
}
public void EnqueueSweep(IncomingPacket incoming)
{
_packetQ.Add(incoming);
}
private void Consume()
{
foreach (var sweep in _packetQ.GetConsumingEnumerable())
{
//do stuff
var worker = new IfftWorker();
Trace.WriteLine(" Thread {0} picking up a pending ifft".With(Thread.CurrentThread.ManagedThreadId));
worker.DoIfft(sweep);
}
}
public int QueueCount
{
get
{
return _packetQ.Count;
}
}
#region IDisposable Members
public void Dispose()
{
_packetQ.CompleteAdding();
}
#endregion
}
Does this look like a good solution? It seems to max out all cores - although I'm currently unsure how many workers I should spawn in my constructor.
That looks reasonable. I've found BlockingCollection
to be quite fast. I use it to process tens of thousands of requests per second.
If your application is processor bound, then you probably don't want to create more workers than you have cores. Certainly you don't want to create a lot more workers than cores. On a quad core machine, if you expect most of the time to be spent doing the FFTs, then four workers will eat all the CPU. More workers just means more that you have thread context switches to deal with. The TPL will typically balance that for you, but there's no reason to create, say, 100 workers when you can't handle more than a handful.
I would suggest that you run tests with 3, 4, 5, 6, 7, and 8 workers. See which one gives you the best throughput.
I agree with Jim. Your approach looks really good. You are not going to get much better this. I am not an FFT expert, but I am assuming these operations are nearly 100% CPU bound. If that is indeed the case then a good first guess at the number of workers would be a direct 1-to-1 correlation with the number of cores in the machine. You can use Environment.ProcessorCount
to get this value. You could experiment with a multiplier of say 2x or 4x, but again, if these operations are CPU bound then anything higher than 1x might just cause more overhead. Using Environment.ProcessorCount
would make your code more portable.
Another suggestion...let the TPL know that these are dedicated threads. You can do this by specifying the LongRunning
option.
public IncomingPacketQueue()
{
for (int i = 0; i < Environment.ProcessorCount; i++)
{
Task.Factory.StartNew(Consume, TaskCreationOptions.LongRunning);
}
}
Why not use Parallel.ForEach and let TPL handle the number of threads created.
Parallel.ForEach(BlockingCollectionExtensions.GetConsumingPartitioneenter(_packetQ),
sweep => {
//do stuff
var worker = new IfftWorker();
Trace.WriteLine(" Thread {0} picking up a pending ifft".With(Thread.CurrentThread.ManagedThreadId));
worker.DoIfft(sweep);
});
(the GetConsumingPartitioner is part of the ParallelExtensionsExtras)
Make the number of workers configruable. Also too many workers and it will get slower (as indicated by another poster), so you need to find the sweet spot. A configurable value would allow test runs to find the optimal value or would allow your program to be adaptable for different types of hardware. YOu could certainly place this value in App.Config and read it on startup.
You could also try using PLINQ to parallelize the processing to see how it compares to the approach you're currently using. It has some tricks up its sleeve that can make it very efficient under certain circumstances.
_packetQ.GetConsumingEnumerable().AsParallel().ForAll(
sweep => new IfftWorker().DoIfft(sweep));