为什么迭代GetConsumingEnumerable()不是完全空的根本阻挡集合(Why does

2019-07-29 00:50发布

我使用任务并行库可量化和可重复的问题, BlockingCollection<T> ConcurrentQueue<T> GetConsumingEnumerable试图创建一个简单的管道。

概括地说,将条目添加到默认BlockingCollection<T>其中发动机盖是依靠一下ConcurrentQueue<T>从一个线程,并不能保证它们将被弹出的BlockingCollection<T>从另一个线程调用GetConsumingEnumerable()方法。

我创建了一个非常简单的WinForms应用程序复制/模拟这个刚刚打印整数到屏幕上。

  • Timer1负责排队的工作项目......它使用一个叫做并发字典_tracker以便它知道它已经添加到收藏阻塞。
  • Timer2只是记录双方的计数状态BlockingCollection与该的_tracker
  • START按钮序幕一个Paralell.ForEach它简单地迭代阻挡集合GetConsumingEnumerable()并开始它们印刷到第二列表框。
  • STOP按钮停止Timer1防止被添加到所述阻挡收集多个条目。
public partial class Form1 : Form
{
    private int Counter = 0;
    private BlockingCollection<int> _entries;
    private ConcurrentDictionary<int, int> _tracker;
    private CancellationTokenSource _tokenSource;
    private TaskFactory _factory;

    public Form1()
    {
        _entries = new BlockingCollection<int>();
        _tracker = new ConcurrentDictionary<int, int>();
        _tokenSource = new CancellationTokenSource();
        _factory = new TaskFactory(); 
        InitializeComponent();
    }

    private void timer1_Tick(object sender, EventArgs e)
    { //ADDING TIMER -> LISTBOX 1
        for(var i = 0; i < 3; i++,Counter++)
        {
            if (_tracker.TryAdd(Counter, Counter))
            _entries.Add(Counter);
            listBox1.Items.Add(string.Format("Adding {0}", Counter));
        }
    }

    private void timer2_Tick_1(object sender, EventArgs e)
    { //LOGGING TIMER -> LIST BOX 3
        listBox3.Items.Add(string.Format("Tracker Count : {0} / Entries Count : {1}", _tracker.Count, _entries.Count));
    }

    private void button1_Click(object sender, EventArgs e)
    { //START BUTTON -> LOGS TO LIST BOX 2

        var options = new ParallelOptions {
                                CancellationToken = _tokenSource.Token,
                                MaxDegreeOfParallelism = 1
                            };

        _factory.StartNew(() => { Parallel.ForEach(_entries.GetConsumingEnumerable(), options, DoWork); });

        timer1.Enabled = timer2.Enabled = true;
        timer1.Start();
        timer2.Start();
    }

    private void DoWork(int entry)
    {
        Thread.Sleep(1000); //Sleep for 1 second to simulate work being done.
        Invoke((MethodInvoker)(() => listBox2.Items.Add(string.Format("Processed {0}", entry))));
        int oldEntry;
        _tracker.TryRemove(entry, out oldEntry);
    }

    private void button2_Click(object sender, EventArgs e)
    { //STOP BUTTON
        timer1.Stop();
        timer1.Enabled = false;
    }

以下是事件序列:

  • 按开始
  • 定时器1蜱&ListBox1中立即用3个的消息来更新(添加0,1,2)
  • ListBox2被随后更新与3条消息,间隔1秒
    • 加工0
    • 处理1
    • 处理2
  • 定时器1蜱&ListBox1中立即用3个的消息来更新(添加3,4,5)
  • ListBox2被sbsequent与2条消息,间隔1秒更新
    • 处理3
    • 处理4
    • 处理5不打印......似乎已经“失踪”
  • 按停止,以避免更多的消息通过定时器1被添加
  • 等待...“处理5”仍然没有出现

你可以看到,并发字典仍然追踪1项尚未处理和随后从去除_tracker

如果我再按开始,然后开始TIMER1添加更多的3个条目和并行循环起死回生打印5,6,7和8。

我在一个完全丧失,为什么发生这种情况。 调用再次开始明显调用newtask,它调用一个Paralell的foreach,并重新执行GetConsumingEnumerable(),它奇迹般地找到了丢失的入口......我

为什么BlockingCollection.GetConsumingEnumerable()不能保证到了加的集合中的每个项目迭代。

为什么添加更多的条目随后使其得到“脱胶”,并继续它的处理?

Answer 1:

不能使用GetConsumingEnumerable()Parallel.ForEach()

使用GetConsumingPartitioner从TPL演员

在博客中,你也会知道为什么不能使用的解释GetConsumingEnumerable()

通过既Parallel.ForEach和PLINQ使用,以便最小化成本的同步分块默认使用的分区算法:而不是每个元件采取锁定一次,它会采取锁,抓住一组元素(块),然后解除锁定。

即Parallel.ForEach等待,直到它接收继续之前的一组工作项。 实验表明究竟是什么。



Answer 2:

作为.NET 4.5的,你可以创建一个分区,将在同一时间只需要1项:

var partitioner = Partitioner.Create(jobsBatchesQ.queue.GetConsumingEnumerable(), EnumerablePartitionerOptions.NoBuffering);
Parallel.ForEach(partitioner, new ParallelOptions { MaxDegreeOfParallelism = (currentTask.ParallelLevel > 0 ? currentTask.ParallelLevel : 1) }, (batch, state) => {//do stuff}

https://msdn.microsoft.com/en-us/library/system.collections.concurrent.enumerablepartitioneroptions(v=vs.110).aspx



Answer 3:

我无法复制用简单的控制台应用程序基本上做同样的事情(在.NET 4.5测试版运行,这可能有所作为)你的行为。 但我认为出现这种情况的原因是, Parallel.ForEach()试图通过拆分输入收集成块,以优化执行。 并与你的枚举,不能在您添加更多的项目集合创建了一大块。 欲了解更多信息,请参阅用于PLINQ和TPL MSDN上的自定义Partitioners

为了解决这个问题,不要使用Parallel.ForEach() 如果你仍然想并行处理的项目,你就可以开始一个Task在每个迭代。



Answer 4:

我觉得,在你能够调用BlockingCollection的.CompleteAdding()方法执行Parallel.foreach之前到的情况下,这个问题你描述上面会不会有问题,我应该注意的只是为了清楚起见。 我以极大的结果使用这两个对象一起多次。

此外,您可以随时重新设置您的BlockingCollection调用CompleteAdding()在需要时添加更多的项目后(_entries =新BlockingCollection();)

上述变更如下将与欠缺的出入解决您的问题,并使其按预期工作,如果你单击开始和停止按钮多次点击事件代码:

private void button2_Click(object sender, EventArgs e)
{ //STOP BUTTON
    timer1.Stop();
    timer1.Enabled = false;
>>>>_entries.CompleteAdding();
>>>>_entries = new BlockingCollection<int>();
}


文章来源: Why does iterating over GetConsumingEnumerable() not fully empty the underlying blocking collection