-->

TPL数据流,只有当所有的源数据块完成保证完成(TPL Dataflow, guarantee co

2019-08-03 03:02发布

我怎样才能重新写这两个transformblocks完成后的代码完成的代码? 我认为完成意味着它标记为完成和“出队列”是空的?

public Test()
    {
        broadCastBlock = new BroadcastBlock<int>(i =>
            {
                return i;
            });

        transformBlock1 = new TransformBlock<int, string>(i =>
            {
                Console.WriteLine("1 input count: " + transformBlock1.InputCount);
                Thread.Sleep(50);
                return ("1_" + i);
            });

        transformBlock2 = new TransformBlock<int, string>(i =>
            {
                Console.WriteLine("2 input count: " + transformBlock1.InputCount);
                Thread.Sleep(20);
                return ("2_" + i);
            });

        processorBlock = new ActionBlock<string>(i =>
            {
                Console.WriteLine(i);
            });

        //Linking
        broadCastBlock.LinkTo(transformBlock1, new DataflowLinkOptions { PropagateCompletion = true });
        broadCastBlock.LinkTo(transformBlock2, new DataflowLinkOptions { PropagateCompletion = true });
        transformBlock1.LinkTo(processorBlock, new DataflowLinkOptions { PropagateCompletion = true });
        transformBlock2.LinkTo(processorBlock, new DataflowLinkOptions { PropagateCompletion = true });
    }

    public void Start()
    {
        const int numElements = 100;

        for (int i = 1; i <= numElements; i++)
        {
            broadCastBlock.SendAsync(i);
        }

        //mark completion
        broadCastBlock.Complete();

        processorBlock.Completion.Wait();

        Console.WriteLine("Finished");
        Console.ReadLine();
    }
}

我编辑的代码,增加的输入缓冲器计数每个变换块。 显然,所有的100个项目都分流到各变换块的。 但只要transformblocks的一个结束processorblock不接受任何更多的项目,取而代之的是不完整的transformblock的输入缓冲区刚刚刷新输入缓冲区。

Answer 1:

这个问题正是casperOne他回答说。 一旦第一变换块完成,处理器块进入“整理模式”:它会处理在其输入队列中剩余的项目,但它不会接受任何新的项目。

还有一个更简单的修复虽然不是分裂两种您的处理器块:不设置PropagateCompletion ,而是手动设置处理器块完成当两个变换块完整:

Task.WhenAll(transformBlock1.Completion, transformBlock2.Completion)
    .ContinueWith(_ => processorBlock.Complete());


Answer 2:

这里的问题是,你设置PropagateCompletion属性每次调用时LinkTo方法在你的变换块的块和等待时间的不同链接。

从该文件Complete方法上IDataflowBlock接口 (重点煤矿):

信号到IDataflowBlock,它不应该接受也不产生任何更多的消息,也没有消耗任何更多推迟的消息

因为你在每一个的错开您的等待时间TransformBlock<TInput, TOutput>实例, transformBlock2 (等待20毫秒)之前完成transformBlock1 (等待50毫秒)。 transformBlock2完成第一,然后将信号发送到processorBlock ,然后说:“我不接受任何其他”(和transformBlock1已经不生产它的所有消息还)。

需要注意的是处理transformBlock1之前transformBlock1不是绝对保证; 这是可行的线程池(假设你使用默认的调度)将处理以不同的顺序的任务(很可能不会,但更多的,因为它会从曾经的20毫秒的项目完成队列偷工作)。

你的管道是这样的:

           broadcastBlock
          /              \
 transformBlock1   transformBlock2
          \              /
           processorBlock

为了解决这个问题,你想有一个管道,看起来像这样:

           broadcastBlock
          /              \
 transformBlock1   transformBlock2
          |              |
 processorBlock1   processorBlock2

这是由只创建两个单独完成ActionBlock<TInput>情况下,像这样:

// The action, can be a method, makes it easier to share.
Action<string> a = i => Console.WriteLine(i);

// Create the processor blocks.
processorBlock1 = new ActionBlock<string>(a);
processorBlock2 = new ActionBlock<string>(a);


// Linking
broadCastBlock.LinkTo(transformBlock1, 
    new DataflowLinkOptions { PropagateCompletion = true });
broadCastBlock.LinkTo(transformBlock2, 
    new DataflowLinkOptions { PropagateCompletion = true });
transformBlock1.LinkTo(processorBlock1, 
    new DataflowLinkOptions { PropagateCompletion = true });
transformBlock2.LinkTo(processorBlock2, 
    new DataflowLinkOptions { PropagateCompletion = true });

然后你需要等待两个处理器块而不是只有一个:

Task.WhenAll(processorBlock1.Completion, processorBlock2.Completion).Wait();

这里一个非常重要的注意事项; 创建时ActionBlock<TInput>默认是具有MaxDegreeOfParallelism属性上ExecutionDataflowBlockOptions传递给它设置为一个实例。

这意味着,在调用Action<T>委托 ,你传递给ActionBlock<TInput>是线程安全的,只有一个会在同一时间执行。

因为你现在有两个 ActionBlock<TInput>指向相同的情况下, Action<T>委托,你不能保证线程安全。

如果你的方法是线程安全的,那么你就没有做任何事情(这将让你的设置MaxDegreeOfParallelism属性DataflowBlockOptions.Unbounded ,因为没有理由阻止)。

如果它不是线程安全的,并且需要保证它,你需要求助于传统的同步原语,如lock声明 。

在这种情况下,你会做它像这样(尽管它显然没有必要,因为WriteLine方法在Console类是线程安全的):

// The lock.
var l = new object();

// The action, can be a method, makes it easier to share.
Action<string> a = i => {
    // Ensure one call at a time.
    lock (l) Console.WriteLine(i);
};

// And so on...


Answer 3:

一个除了svick的回答是:与你同PropagateCompletion选项获得一致的行为,还需要转发的情况下例外前面的块故障。 象下面的扩展方法需要照顾的是还有:

public static void CompleteWhenAll(this IDataflowBlock target, params IDataflowBlock[] sources) {
    if (target == null) return;
    if (sources.Length == 0) { target.Complete(); return; }
    Task.Factory.ContinueWhenAll(
        sources.Select(b => b.Completion).ToArray(),
        tasks => {
            var exceptions = (from t in tasks where t.IsFaulted select t.Exception).ToList();
            if (exceptions.Count != 0) {
                target.Fault(new AggregateException(exceptions));
            } else {
                target.Complete();
            }
        }
    );
}


Answer 4:

Other answers are quite clear about why PropagateCompletion=true mess things up when a block has more than two sources.

To provide a simple solution to the problem, you may want to look at an open source library DataflowEx that solves this kind of problem with smarter completion rules built-in. (It uses TPL Dataflow linking internally but supports complex completion propagation. The implementation looks similiar to WhenAll but also handles the dynamic link adding. Please check Dataflow.RegisterDependency() and TaskEx.AwaitableWhenAll() for impl detail.)

I slightly changed your code to make everything work using DataflowEx:

public CompletionDemo1()
{
    broadCaster = new BroadcastBlock<int>(
        i =>
            {
                return i;
            }).ToDataflow();

    transformBlock1 = new TransformBlock<int, string>(
        i =>
            {
                Console.WriteLine("1 input count: " + transformBlock1.InputCount);
                Thread.Sleep(50);
                return ("1_" + i);
            });

    transformBlock2 = new TransformBlock<int, string>(
        i =>
            {
                Console.WriteLine("2 input count: " + transformBlock2.InputCount);
                Thread.Sleep(20);
                return ("2_" + i);
            });

    processor = new ActionBlock<string>(
        i =>
            {
                Console.WriteLine(i);
            }).ToDataflow();

    /** rather than TPL linking
      broadCastBlock.LinkTo(transformBlock1, new DataflowLinkOptions { PropagateCompletion = true });
      broadCastBlock.LinkTo(transformBlock2, new DataflowLinkOptions { PropagateCompletion = true });
      transformBlock1.LinkTo(processorBlock, new DataflowLinkOptions { PropagateCompletion = true });
      transformBlock2.LinkTo(processorBlock, new DataflowLinkOptions { PropagateCompletion = true });
     **/

    //Use DataflowEx linking
    var transform1 = transformBlock1.ToDataflow();
    var transform2 = transformBlock2.ToDataflow();

    broadCaster.LinkTo(transform1);
    broadCaster.LinkTo(transform2);
    transform1.LinkTo(processor);
    transform2.LinkTo(processor);
}

Full code is here.

Disclaimer: I am the author of DataflowEx, which is published under MIT license.



文章来源: TPL Dataflow, guarantee completion only when ALL source data blocks completed