I am trying to achieve the following behaviour using the Task Parallel Library:
As messages arrive I would like to process them sequentially but in groups. So when the first message arrives it should be processed immediately. If 2 messages come in while the first is being processed then they should be processed in a group of 2.
I can almost get what I want using a BatchBlock
linked to an ActionBlock
var batchBlock = new BatchBlock<int>(100);
var actionBlock = new ActionBlock<int[]>(list =>
{
// do work
// now trigger
batchBlock.TriggerBatch();
});
batchBlock.LinkTo(actionBlock);
The problem with the code above is that if an item arrives after the TriggerBatch()
call then it needs to wait for the batch to fill up. If I trigger batch after each post instead then the ActionBlock
always receives single messages.
You can also use Timer; which will Trigger Batch on every 10 seconds
Instead of
BatchBlock
, you could useBufferBlock
with aTask
the receives items from it and resends them in batches to the target, according to your logic. Because you need to try to send a message containing a batch, and cancel it if another item comes in, the target block (actionBlock
in your sample) has to haveBoundedCapacity
set to 1.So, what you do is that you first receive something. When you have that, you start sending asynchronously and you also try to receive more items. If sending completes first, you start over. If receiving completes first, you cancel sending, add the received items to the batch, and then start both asynchronous actions again.
The actual code is a bit more complicated, because it needs to deal with some corner cases (receiving and sending complete at the same time; sending couldn't be canceled; receiving completed, because the whole was completed; exceptions):