Get batches of messages as available

2019-06-06 09:51发布

I am trying to achieve the following behaviour using the Task Parallel Library:

As messages arrive I would like to process them sequentially but in groups. So when the first message arrives it should be processed immediately. If 2 messages come in while the first is being processed then they should be processed in a group of 2.

I can almost get what I want using a BatchBlock linked to an ActionBlock

var batchBlock = new BatchBlock<int>(100);

var actionBlock = new ActionBlock<int[]>(list =>
    {
        // do work

        // now trigger
        batchBlock.TriggerBatch();
    });

batchBlock.LinkTo(actionBlock);

The problem with the code above is that if an item arrives after the TriggerBatch() call then it needs to wait for the batch to fill up. If I trigger batch after each post instead then the ActionBlock always receives single messages.

2条回答
时光不老,我们不散
2楼-- · 2019-06-06 10:27

You can also use Timer; which will Trigger Batch on every 10 seconds

查看更多
Melony?
3楼-- · 2019-06-06 10:33

Instead of BatchBlock, you could use BufferBlock with a Task the receives items from it and resends them in batches to the target, according to your logic. Because you need to try to send a message containing a batch, and cancel it if another item comes in, the target block (actionBlock in your sample) has to have BoundedCapacity set to 1.

So, what you do is that you first receive something. When you have that, you start sending asynchronously and you also try to receive more items. If sending completes first, you start over. If receiving completes first, you cancel sending, add the received items to the batch, and then start both asynchronous actions again.

The actual code is a bit more complicated, because it needs to deal with some corner cases (receiving and sending complete at the same time; sending couldn't be canceled; receiving completed, because the whole was completed; exceptions):

public static ITargetBlock<T> CreateBatchingWrapper<T>(
ITargetBlock<IReadOnlyList<T>> target)
{
    // target should have BoundedCapacity == 1,
    // but there is no way to check for that

    var source = new BufferBlock<T>();

    Task.Run(() => BatchItems(source, target));

    return source;
}

private static async Task BatchItems<T>(
    IReceivableSourceBlock<T> source, ITargetBlock<IReadOnlyList<T>> target)
{
    try
    {
        while (true)
        {
            var messages = new List<T>();

            // wait for first message in batch
            if (!await source.OutputAvailableAsync())
            {
                // source was completed, complete target and return
                target.Complete();
                return;
            }

            // receive all there is right now
            source.ReceiveAllInto(messages);

            // try sending what we've got
            var sendCancellation = new CancellationTokenSource();
            var sendTask = target.SendAsync(messages, sendCancellation.Token);

            var outputAvailableTask = source.OutputAvailableAsync();

            while (true)
            {
                await Task.WhenAny(sendTask, outputAvailableTask);

                // got another message, try cancelling send
                if (outputAvailableTask.IsCompleted
                    && outputAvailableTask.Result)
                {
                    sendCancellation.Cancel();

                    // cancellation wasn't successful
                    // and the message was received, start another batch
                    if (!await sendTask.EnsureCancelled() && sendTask.Result)
                        break;

                    // send was cancelled, receive messages
                    source.ReceiveAllInto(messages);

                    // restart both Tasks
                    sendCancellation = new CancellationTokenSource();
                    sendTask = target.SendAsync(
                        messages, sendCancellation.Token);
                    outputAvailableTask = source.OutputAvailableAsync();
                }
                else
                {
                    // we get here in three situations:
                    // 1. send was completed succesfully
                    // 2. send failed
                    // 3. input has completed
                    // in cases 2 and 3, this await is necessary
                    // in case 1, it's harmless
                    await sendTask;

                    break;
                }
            }
        }
    }
    catch (Exception e)
    {
        source.Fault(e);
        target.Fault(e);
    }
}

/// <summary>
/// Returns a Task that completes when the given Task completes.
/// The Result is true if the Task was cancelled,
/// and false if it completed successfully.
/// If the Task was faulted, the returned Task is faulted too.
/// </summary>
public static Task<bool> EnsureCancelled(this Task task)
{
    return task.ContinueWith(t =>
    {
        if (t.IsCanceled)
            return true;
        if (t.IsFaulted)
        {
            // rethrow the exception
            ExceptionDispatchInfo.Capture(task.Exception.InnerException)
                .Throw();
        }

        // completed successfully
        return false;
    });
}

public static void ReceiveAllInto<T>(
    this IReceivableSourceBlock<T> source, List<T> targetCollection)
{
    // TryReceiveAll would be best suited for this, except it's bugged
    // (see http://connect.microsoft.com/VisualStudio/feedback/details/785185)
    T item;
    while (source.TryReceive(out item))
        targetCollection.Add(item);
}
查看更多
登录 后发表回答