I am trying to achieve the following behaviour using the Task Parallel Library:
As messages arrive I would like to process them sequentially but in groups. So when the first message arrives it should be processed immediately. If 2 messages come in while the first is being processed then they should be processed in a group of 2.
I can almost get what I want using a BatchBlock
linked to an ActionBlock
var batchBlock = new BatchBlock<int>(100);
var actionBlock = new ActionBlock<int[]>(list =>
{
// do work
// now trigger
batchBlock.TriggerBatch();
});
batchBlock.LinkTo(actionBlock);
The problem with the code above is that if an item arrives after the TriggerBatch()
call then it needs to wait for the batch to fill up. If I trigger batch after each post instead then the ActionBlock
always receives single messages.
Instead of BatchBlock
, you could use BufferBlock
with a Task
the receives items from it and resends them in batches to the target, according to your logic. Because you need to try to send a message containing a batch, and cancel it if another item comes in, the target block (actionBlock
in your sample) has to have BoundedCapacity
set to 1.
So, what you do is that you first receive something. When you have that, you start sending asynchronously and you also try to receive more items. If sending completes first, you start over. If receiving completes first, you cancel sending, add the received items to the batch, and then start both asynchronous actions again.
The actual code is a bit more complicated, because it needs to deal with some corner cases (receiving and sending complete at the same time; sending couldn't be canceled; receiving completed, because the whole was completed; exceptions):
public static ITargetBlock<T> CreateBatchingWrapper<T>(
ITargetBlock<IReadOnlyList<T>> target)
{
// target should have BoundedCapacity == 1,
// but there is no way to check for that
var source = new BufferBlock<T>();
Task.Run(() => BatchItems(source, target));
return source;
}
private static async Task BatchItems<T>(
IReceivableSourceBlock<T> source, ITargetBlock<IReadOnlyList<T>> target)
{
try
{
while (true)
{
var messages = new List<T>();
// wait for first message in batch
if (!await source.OutputAvailableAsync())
{
// source was completed, complete target and return
target.Complete();
return;
}
// receive all there is right now
source.ReceiveAllInto(messages);
// try sending what we've got
var sendCancellation = new CancellationTokenSource();
var sendTask = target.SendAsync(messages, sendCancellation.Token);
var outputAvailableTask = source.OutputAvailableAsync();
while (true)
{
await Task.WhenAny(sendTask, outputAvailableTask);
// got another message, try cancelling send
if (outputAvailableTask.IsCompleted
&& outputAvailableTask.Result)
{
sendCancellation.Cancel();
// cancellation wasn't successful
// and the message was received, start another batch
if (!await sendTask.EnsureCancelled() && sendTask.Result)
break;
// send was cancelled, receive messages
source.ReceiveAllInto(messages);
// restart both Tasks
sendCancellation = new CancellationTokenSource();
sendTask = target.SendAsync(
messages, sendCancellation.Token);
outputAvailableTask = source.OutputAvailableAsync();
}
else
{
// we get here in three situations:
// 1. send was completed succesfully
// 2. send failed
// 3. input has completed
// in cases 2 and 3, this await is necessary
// in case 1, it's harmless
await sendTask;
break;
}
}
}
}
catch (Exception e)
{
source.Fault(e);
target.Fault(e);
}
}
/// <summary>
/// Returns a Task that completes when the given Task completes.
/// The Result is true if the Task was cancelled,
/// and false if it completed successfully.
/// If the Task was faulted, the returned Task is faulted too.
/// </summary>
public static Task<bool> EnsureCancelled(this Task task)
{
return task.ContinueWith(t =>
{
if (t.IsCanceled)
return true;
if (t.IsFaulted)
{
// rethrow the exception
ExceptionDispatchInfo.Capture(task.Exception.InnerException)
.Throw();
}
// completed successfully
return false;
});
}
public static void ReceiveAllInto<T>(
this IReceivableSourceBlock<T> source, List<T> targetCollection)
{
// TryReceiveAll would be best suited for this, except it's bugged
// (see http://connect.microsoft.com/VisualStudio/feedback/details/785185)
T item;
while (source.TryReceive(out item))
targetCollection.Add(item);
}
You can also use Timer; which will Trigger Batch on every 10 seconds