I have a Dataflow pipeline consisting of several blocks.
When elements are flowing through my processing pipeline, I want to group them by field A
. To do this I have a BatchBlock
with high BoundedCapacity
. In it I store my elements until I decide that they should be released. So I invoke TriggerBatch()
method.
private void Forward(TStronglyTyped data)
{
if (ShouldCreateNewGroup(data))
{
GroupingBlock.TriggerBatch();
}
GroupingBlock.SendAsync(data).Wait(SendTimeout);
}
This is how it looks. The problem is, that the batch produced, sometimes contains the next posted element, which shouldn't be there.
To illustrate:
BatchBlock.InputQueue = {A,A,A}
NextElement = B //we should trigger a Batch!
BatchBlock.TriggerBatch()
BatchBlock.SendAsync(B);
In this point I expect my batch to be {A,A,A}
, but it is {A,A,A,B}
Like TriggerBatch()
was asynchronous, and SendAsync
was in fact executed before the batch was actually made.
How can I solve this?
I obviously don't want to put Task.Wait(x)
in there (I tried, and it works, but then performance is poor, of course).
I also encountered this issue by trying to call
TriggerBatch
in the wrong place. As mentioned, the SlidingWindow example usingDataflowBlock.Encapsulate
is the answer here, but it took some time to adapt so I thought I'd share my completed block.My
ConditionalBatchBlock
creates batches up to a maximum size, possibly sooner if a certain condition is met. In my specific scenario I needed to create batches of 100, but always create new batches when certain changes in the data were detected.The
condition
parameter may be simpler in your case. I needed to look at the queue as well as the current item to make the determination whether to create a new batch.I used it like this: