TPL Dataflow: Bounded capacity and waiting for com

2019-02-11 08:57发布

问题:

Below I have replicated a real life scenario as a LINQPad script for the sake of simplicity:

var total = 1 * 1000 * 1000;
var cts = new CancellationTokenSource();
var threads = Environment.ProcessorCount;
int capacity = 10;

var edbOptions = new ExecutionDataflowBlockOptions{BoundedCapacity = capacity, CancellationToken = cts.Token, MaxDegreeOfParallelism = threads};
var dbOptions = new DataflowBlockOptions {BoundedCapacity = capacity, CancellationToken = cts.Token};
var gdbOptions = new GroupingDataflowBlockOptions {BoundedCapacity = capacity, CancellationToken = cts.Token};
var dlOptions = new DataflowLinkOptions {PropagateCompletion = true};

var counter1 = 0;
var counter2 = 0;

var delay1 = 10;
var delay2 = 25;

var action1 = new Func<IEnumerable<string>, Task>(async x => {await Task.Delay(delay1); Interlocked.Increment(ref counter1);});
var action2 = new Func<IEnumerable<string>, Task>(async x => {await Task.Delay(delay2); Interlocked.Increment(ref counter2);});

var actionBlock1 = new ActionBlock<IEnumerable<string>>(action1, edbOptions);
var actionBlock2 = new ActionBlock<IEnumerable<string>>(action2, edbOptions);

var batchBlock1 = new BatchBlock<string>(5, gdbOptions);
var batchBlock2 = new BatchBlock<string>(5, gdbOptions);

batchBlock1.LinkTo(actionBlock1, dlOptions);
batchBlock2.LinkTo(actionBlock2, dlOptions);

var bufferBlock1 = new BufferBlock<string>(dbOptions); 
var bufferBlock2 = new BufferBlock<string>(dbOptions); 

bufferBlock1.LinkTo(batchBlock1, dlOptions);
bufferBlock2.LinkTo(batchBlock2, dlOptions);

var bcBlock = new BroadcastBlock<string>(x => x, dbOptions);

bcBlock.LinkTo(bufferBlock1, dlOptions);
bcBlock.LinkTo(bufferBlock2, dlOptions);

var mainBlock = new TransformBlock<int, string>(x => x.ToString(), edbOptions);
mainBlock.LinkTo(bcBlock, dlOptions);

mainBlock.Dump("Main Block");
bcBlock.Dump("Broadcast Block");
bufferBlock1.Dump("Buffer Block 1");
bufferBlock2.Dump("Buffer Block 2");
actionBlock1.Dump("Action Block 1");
actionBlock2.Dump("Action Block 2");

foreach(var i in Enumerable.Range(1, total))
  await mainBlock.SendAsync(i, cts.Token);

mainBlock.Complete();

await Task.WhenAll(actionBlock1.Completion, actionBlock2.Completion);

counter1.Dump("Counter 1");
counter2.Dump("Counter 2");

I have two issues with this code:

  1. Although I limited BoundedCapacity of all appropriate blocks to 10 elements, it seems like I can push all 1,000,000 messages almost at once. Is this expected behavior?
  2. Although the entire network is configured to propagate completion, it seems like all blocks get completed almost immediately after calling mainBlock.Complete(). I expect that both counter1 and counter2 variables to be equal to total. Is there a way to achieve such behavior?

回答1:

Yes, this is the expected behavior, because of the BroadcastBlock:

Provides a buffer for storing at most one element at time, overwriting each message with the next as it arrives.

This means that if you link BroadcastBlock to blocks with BoundedCapacity, you will lose messages.

To fix that, you could create a custom block that behaves like BroadcastBlock, but guarantees delivery to all targets. But doing that is not trivial, so you might be satisified with a simpler variant (originally from my old answer):

public static ITargetBlock<T> CreateGuaranteedBroadcastBlock<T>(
    IEnumerable<ITargetBlock<T>> targets, DataflowBlockOptions options)
{
    var targetsList = targets.ToList();

    var block = new ActionBlock<T>(
        async item =>
        {
            foreach (var target in targetsList)
            {
                await target.SendAsync(item);
            }
        }, new ExecutionDataflowBlockOptions
        {
            BoundedCapacity = options.BoundedCapacity,
            CancellationToken = options.CancellationToken
        });

    block.Completion.ContinueWith(task =>
    {
        foreach (var target in targetsList)
        {
            if (task.Exception != null)
                target.Fault(task.Exception);
            else
                target.Complete();
        }
    });

    return block;
}

Usage in your case would be:

var bcBlock = CreateGuaranteedBroadcastBlock(
    new[] { bufferBlock1, bufferBlock2 }, dbOptions);