I have used many times the BlockingCollection
for implementing the producer/consumer pattern, but I have experienced bad performance with extremely granular data because of the associated overhead. This usually forces me to improvise by chunkifying/partitioning my data, in other words using a BlockingCollection<T[]>
instead of BlockingCollection<T>
. Here is a resent example. This works but it's ugly and error-prone. I end up using nested loops at both the producer and the consumer, and I must remember to Add
what is left at the end of a producer's workload. So I had the idea of implementing a chunky BlockingCollection
, that will handle all these complications internally, and will externalize the same simple interface with the existing BlockingCollection
. My problem is that I haven't managed yet to match the performance of the complex manual partitioning. My best attempt still pays a performance tax of around +100%, for extremely granular data (basically just integer values). So I would like to present here what I have done so far, hoping for an advice that will help me close the performance gap.
My best attempt is using a ThreadLocal<List<T>>
, so that each thread works on a dedicated chunk, removing any need for locks.
public class ChunkyBlockingCollection1<T>
{
private readonly BlockingCollection<T[]> _blockingCollection;
public readonly int _chunkSize;
private readonly ThreadLocal<List<T>> _chunk;
public ChunkyBlockingCollection1(int chunkSize)
{
_blockingCollection = new BlockingCollection<T[]>();
_chunkSize = chunkSize;
_chunk = new ThreadLocal<List<T>>(() => new List<T>(chunkSize), true);
}
public void Add(T item)
{
var chunk = _chunk.Value;
chunk.Add(item);
if (chunk.Count >= _chunkSize)
{
_blockingCollection.Add(chunk.ToArray());
chunk.Clear();
}
}
public void CompleteAdding()
{
var chunks = _chunk.Values.ToArray();
foreach (var chunk in chunks)
{
_blockingCollection.Add(chunk.ToArray());
chunk.Clear();
}
_blockingCollection.CompleteAdding();
}
public IEnumerable<T> GetConsumingEnumerable()
{
foreach (var chunk in _blockingCollection.GetConsumingEnumerable())
{
for (int i = 0; i < chunk.Length; i++)
{
yield return chunk[i];
}
}
}
}
My second best attempt is using a single List<T>
as chunk, that is accessed by all threads in a thread safe manner using a lock. Surprisingly this is only slightly slower than the ThreadLocal<List<T>>
solution.
public class ChunkyBlockingCollection2<T>
{
private readonly BlockingCollection<T[]> _blockingCollection;
public readonly int _chunkSize;
private readonly List<T> _chunk;
private readonly object _locker = new object();
public ChunkyBlockingCollection2(int chunkSize)
{
_blockingCollection = new BlockingCollection<T[]>();
_chunkSize = chunkSize;
_chunk = new List<T>(chunkSize);
}
public void Add(T item)
{
lock (_locker)
{
_chunk.Add(item);
if (_chunk.Count >= _chunkSize)
{
_blockingCollection.Add(_chunk.ToArray());
_chunk.Clear();
}
}
}
public void CompleteAdding()
{
lock (_locker)
{
_blockingCollection.Add(_chunk.ToArray());
_chunk.Clear();
}
_blockingCollection.CompleteAdding();
}
public IEnumerable<T> GetConsumingEnumerable()
{
foreach (var chunk in _blockingCollection.GetConsumingEnumerable())
{
for (int i = 0; i < chunk.Length; i++)
{
yield return chunk[i];
}
}
}
}
I have also tried to use as chunk a ConcurrentBag<T>
, which resulted in bad performance and an issue with correctness (because I didn't use a lock). Another attempt was replacing the lock (_locker)
with a SpinLock
, with even worse performance. The locking is clearly the root of my problems, because if I remove it completely then my class obtains optimal performance. Of course removing the lock fails miserably with more than one producers.
Update: I implemented the lock-free solution suggested by Nick, making heavy use of the Interlocked
class. In a configuration with one producer the performance is slightly better, but becomes much worse with two or more producers. There are inconsistently many collisions that cause the threads to spin. The implementation is also very tricky, making bugs easy to introduce.
public class ChunkyBlockingCollection3<T>
{
private readonly BlockingCollection<(T[], int)> _blockingCollection;
public readonly int _chunkSize;
private T[] _array;
private int _arrayCount;
private int _arrayCountOfCompleted;
private T[] _emptyArray;
public ChunkyBlockingCollection3(int chunkSize)
{
_chunkSize = chunkSize;
_blockingCollection = new BlockingCollection<(T[], int)>();
_array = new T[chunkSize];
_arrayCount = 0;
_arrayCountOfCompleted = 0;
_emptyArray = new T[chunkSize];
}
public void Add(T item)
{
while (true) // Spin
{
int count = _arrayCount;
while (true) // Spin
{
int previous = count;
count++;
int result = Interlocked.CompareExchange(ref _arrayCount,
count, previous);
if (result == previous) break;
count = result;
}
var array = Interlocked.CompareExchange(ref _array, null, null);
if (array == null) throw new InvalidOperationException(
"The collection has been marked as complete.");
if (count <= _chunkSize)
{
// There is empty space in the array
array[count - 1] = item;
Interlocked.Increment(ref _arrayCountOfCompleted);
break; // Adding is completed
}
if (count == _chunkSize + 1)
{
// Array is full. Push it to the BlockingCollection.
while (Interlocked.CompareExchange(
ref _arrayCountOfCompleted, 0, 0) < _chunkSize) { } // Spin
_blockingCollection.Add((array, _chunkSize));
T[] newArray;
while ((newArray = Interlocked.CompareExchange(
ref _emptyArray, null, null)) == null) { } // Spin
Interlocked.Exchange(ref _array, newArray);
Interlocked.Exchange(ref _emptyArray, null);
Interlocked.Exchange(ref _arrayCountOfCompleted, 0);
Interlocked.Exchange(ref _arrayCount, 0); // Unlock other threads
Interlocked.Exchange(ref _emptyArray, new T[_chunkSize]);
}
else
{
// Wait other thread to replace the full array with a new one.
while (Interlocked.CompareExchange(
ref _arrayCount, 0, 0) > _chunkSize) { } // Spin
}
}
}
public void CompleteAdding()
{
var array = Interlocked.Exchange(ref _array, null);
if (array != null)
{
int count = Interlocked.Exchange(ref _arrayCount, -1);
while (Interlocked.CompareExchange(
ref _arrayCountOfCompleted, 0, 0) < count) { } // Spin
_blockingCollection.Add((array, count));
_blockingCollection.CompleteAdding();
}
}
public IEnumerable<T> GetConsumingEnumerable()
{
foreach (var (array, count) in _blockingCollection.GetConsumingEnumerable())
{
for (int i = 0; i < count; i++)
{
yield return array[i];
}
}
}
}