TPL数据流消费者一次处理多个项目(TPL Dataflow Consumer to Process

2019-10-21 07:11发布

我有一个要求,通过大名单迭代和每个项目调用Web服务来获取一些数据。 不过,我想限制请求到WS的数量说不超过5个在任一个时刻执行的并发请求。 到WS,全部电话都使用由async/await 。 我现在用的是TPL数据流BufferBlockBoundedCapacity 5.所有的东西都工作正常,但我所注意到的是顺序执行的消费者,其等待WS调用,块队列中,直到其完成导致在bufferblock所有请求。 是否有可能有消费者总是在一次处理5个项目从队列? 或者我需要设置多个消费者还是开始寻找到行动块? 所以,总之我想和5项种子队列中。 作为一个项目被处理第六将取而代之等等,所以我总是有5个下去,直到没有更多的项目来处理并发请求。

我用这个作为我的向导: 异步生产者/消费者队列使用数据流

谢谢你的帮助。 下面是代码的简化版本

//set up
BufferBlock<CustomObject> queue = new BufferBlock<CustomObject>(new DataflowBlockOptions { BoundedCapacity = 5 });
var producer = QueueValues(queue, values);
var consumer = ConsumeValues(queue);
await Task.WhenAll(producer, consumer, queue.Completion);
counter = await consumer;

//producer
function QueueValues(BufferBlock<CustomObject> queue, IList<CustomObject> values)
{

    foreach (CustomObject value in values)
    {
        await queue.SendAsync(value);
    }
    queue.Complete();
}


//consumer
function ConsumeValues(BufferBlock<CustomObject> queue)
{
    while (await queue.OutputAvailableAsync())
    {
        CustomObject value = await queue.ReceiveAsync();
            await CallWebServiceAsync(value);
    }
}

Answer 1:

您使用TPL数据流是比较奇怪的。 通常情况下,你的移动消费和加工成流 。 附加一个TransformBlock调用web服务。 删除ConsumeValues

ConsumeValues依次执行这根本不是你想要的。

取而代之的BoundedCapacity我觉得您更希望MaxDegreeOfParallelism



Answer 2:

您应该使用ActionBlockMaxDegreeOfParallelism设置为5。你也可以设置一个BoundedCapacity但这是节流生产者而不是消费者:

var block = new ActionBlock<CustomObject>(
    item => CallWebServiceAsync(item), 
    new ExecutionDataflowBlockOptions
    {
        MaxDegreeOfParallelism = 5,
        BoundedCapacity = 1000
    });

foreach (CustomObject value in values)
{
    await block.SendAsync(value);
}
block.Complete();
await block.Completion;


文章来源: TPL Dataflow Consumer to Process Multiple Items at a time