-->

TPL数据流ProducerConsumer模式(TPL Dataflow ProducerCons

2019-08-06 06:36发布

刚刚撰写了使用TPL数据流的样品生产者消费者模式。 我有一些基本的问题在这里。

  1. 消费者是积极的所有项目都从生产者发布之后。 是否异步意味着这两个生产和消费的任务可以并行运行。

  2. 鉴于消费者的睡眠时间,以验证其是否阻塞其他数据项。 这似乎是在执行顺序,并没有得到任何的并行性。

难道我做错了什么吗?

class AscDataBlocks
{
    public Int64 start;
    public Int64 End;
    //public string ThreadName;
    public void AscBufferProducer(ITargetBlock<Int64> targetAscTransform) 
    // This is using TPL DataBlock producer consumer pattern.
    {
        for (var i = start; i < End; i++)
        {
            Console.WriteLine("Postingasc : {0}", i);
            targetAscTransform.Post(i);
        }
    }

    public void ProcessDataBuffer(Int64 ascDataSet)
    {
        if (ascDataSet == 5) 
        // Testing if this will delay all the other data processing
            Thread.Sleep(5000);
        else
            Thread.Sleep(500);                
        Console.WriteLine(ascDataSet);
    }

    // Demonstrates the consumption end of the producer and consumer pattern. 
    public async Task<Int64> AscTransConsumerAsync(IReceivableSourceBlock<Int64> source)
    {
        // Initialize a counter to track the number of bytes that are processed. 
        int status = 0;

        // Read from the source buffer until the source buffer has no  
        // available output data. 
        while (await source.OutputAvailableAsync())
        {
            Int64 data;
            source.TryReceive(out data);
            ProcessDataBuffer(data);//This function processed the data buffer for ascollection and writes the data to the database.
            // Increment the count of bytes received.
            status = 1;
        }
        return status;
    }
}

static void Main(string[] args)
{
    AscDataBlocks ascb;
    BufferBlock<Int64> ascbuffer;
    System.Threading.Tasks.Task<Int64> ascProcessingconsumer;
    CancellationToken ct = new CancellationToken();
    CancellationTokenSource cts = new CancellationTokenSource();
    ascb = new AscDataBlocks();
    ascb.start = 1;
    ascb.End = 100;           
    try
    {
        ascbuffer = new BufferBlock<Int64>(new ExecutionDataflowBlockOptions
        {
            MaxDegreeOfParallelism = 5,
            CancellationToken = ct
        });// Initiallize the buffer bloack
        ascProcessingconsumer = ascb.AscTransConsumerAsync(ascbuffer); //Initialize the consumer.
        //Post source data to the dataflow block.
        ascb.AscBufferProducer(ascbuffer);
        ascProcessingconsumer.Wait();
    }
    catch (Exception ex)
    {
        //foreach (var v in ex.InnerExceptions)
        //    Console.WriteLine("msg: " + v.Message);                
    }            
}  

Answer 1:

消费者是积极的所有项目都从生产者发布之后。 是否异步意味着这两个生产和消费的任务可以并行运行。

这是因为你发布你所有的项目非常快,消费者有机会再开始。 如果添加Thread.Sleep(100)你会看到他们实际上并行工作。

鉴于消费者的睡眠时间,以验证其是否阻塞其他数据项。 这似乎是在执行顺序,并没有得到任何的并行性。

TPL数据流是不是魔术:它不会改变你的代码并行执行。 这是你谁调用AscTransConsumerAsync()一次,所以不感到惊讶,它实际上只执行一次。

TDF不支持处理并行,但你需要真正让它执行的处理代码。 要做到这一点,利用执行块之一。 在你的情况ActionBlock似乎是适当的。

如果使用,就可以配置设置并行执行块MaxDegreeOfParallelism 。 当然,这样做意味着你需要确保加工委托是线程安全的。

就这样, AscTransConsumerAsync()现在看起来是这样的:

public async Task<Int64> AscTransConsumerAsync(ISourceBlock<Int64> source)
{
    // counter to track the number of items that are processed
    Int64 count = 0;

    var actionBlock = new ActionBlock<Int64>(
        data =>
        {
            ProcessDataBuffer(data);
            // count has to be accessed in a thread-safe manner
            // be careful about using Interlocked,
            // for more complicated computations, locking might be more appropriate
            Interlocked.Increment(ref count);
        },
        // some small constant might be better than Unbounded, depedning on circumstances
        new ExecutionDataflowBlockOptions { MaxDegreeOfParallelism = DataflowBlockOptions.Unbounded });

    source.LinkTo(actionBlock, new DataflowLinkOptions { PropagateCompletion = true });

    // this assumes source will be completed when done,
    // you need to call ascbuffer.Complete() after AscBufferProducer() for this
    await actionBlock.Completion;

    return count;
}


文章来源: TPL Dataflow ProducerConsumer Pattern