I have a Dataflow pipeline consisting of several blocks.
When elements are flowing through my processing pipeline, I want to group them by field A
. To do this I have a BatchBlock
with high BoundedCapacity
. In it I store my elements until I decide that they should be released. So I invoke TriggerBatch()
method.
private void Forward(TStronglyTyped data)
{
if (ShouldCreateNewGroup(data))
{
GroupingBlock.TriggerBatch();
}
GroupingBlock.SendAsync(data).Wait(SendTimeout);
}
This is how it looks.
The problem is, that the batch produced, sometimes contains the next posted element, which shouldn't be there.
To illustrate:
BatchBlock.InputQueue = {A,A,A}
NextElement = B //we should trigger a Batch!
BatchBlock.TriggerBatch()
BatchBlock.SendAsync(B);
In this point I expect my batch to be {A,A,A}
, but it is {A,A,A,B}
Like TriggerBatch()
was asynchronous, and SendAsync
was in fact executed before the batch was actually made.
How can I solve this?
I obviously don't want to put Task.Wait(x)
in there (I tried, and it works, but then performance is poor, of course).
I also encountered this issue by trying to call TriggerBatch
in the wrong place. As mentioned, the SlidingWindow example using DataflowBlock.Encapsulate
is the answer here, but it took some time to adapt so I thought I'd share my completed block.
My ConditionalBatchBlock
creates batches up to a maximum size, possibly sooner if a certain condition is met. In my specific scenario I needed to create batches of 100, but always create new batches when certain changes in the data were detected.
public static IPropagatorBlock<T, T[]> CreateConditionalBatchBlock<T>(int batchSize, Func<Queue<T>, T, bool> condition)
{
var queue = new Queue<T>();
var source = new BufferBlock<T[]>();
var target = new ActionBlock<T>(async item =>
{
// start a new batch if required by the condition
if (condition(queue, item))
{
await source.SendAsync(queue.ToArray());
queue.Clear();
}
queue.Enqueue(item);
// always send a batch when the max size has been reached
if (queue.Count == batchSize)
{
await source.SendAsync(queue.ToArray());
queue.Clear();
}
});
// send any remaining items
target.Completion.ContinueWith(async t =>
{
if (queue.Any())
await source.SendAsync(queue.ToArray());
source.Complete();
});
return DataflowBlock.Encapsulate(target, source);
}
The condition
parameter may be simpler in your case. I needed to look at the queue as well as the current item to make the determination whether to create a new batch.
I used it like this:
public async Task RunExampleAsync<T>()
{
var conditionalBatchBlock = CreateConditionalBatchBlock<T>(100, (queue, currentItem) => ShouldCreateNewBatch(queue, currentItem));
var actionBlock = new ActionBlock<T[]>(async x => await PerformActionAsync(x));
conditionalBatchBlock.LinkTo(actionBlock, new DataflowLinkOptions { PropagateCompletion = true });
await ReadDataAsync<T>(conditionalBatchBlock);
await actionBlock.Completion;
}