我使用任务并行库可量化和可重复的问题, BlockingCollection<T>
ConcurrentQueue<T>
GetConsumingEnumerable
试图创建一个简单的管道。
概括地说,将条目添加到默认BlockingCollection<T>
其中发动机盖是依靠一下ConcurrentQueue<T>
从一个线程,并不能保证它们将被弹出的BlockingCollection<T>
从另一个线程调用GetConsumingEnumerable()
方法。
我创建了一个非常简单的WinForms应用程序复制/模拟这个刚刚打印整数到屏幕上。
-
Timer1
负责排队的工作项目......它使用一个叫做并发字典_tracker
以便它知道它已经添加到收藏阻塞。 -
Timer2
只是记录双方的计数状态BlockingCollection
与该的_tracker
- START按钮序幕一个
Paralell.ForEach
它简单地迭代阻挡集合GetConsumingEnumerable()
并开始它们印刷到第二列表框。 - STOP按钮停止
Timer1
防止被添加到所述阻挡收集多个条目。
public partial class Form1 : Form
{
private int Counter = 0;
private BlockingCollection<int> _entries;
private ConcurrentDictionary<int, int> _tracker;
private CancellationTokenSource _tokenSource;
private TaskFactory _factory;
public Form1()
{
_entries = new BlockingCollection<int>();
_tracker = new ConcurrentDictionary<int, int>();
_tokenSource = new CancellationTokenSource();
_factory = new TaskFactory();
InitializeComponent();
}
private void timer1_Tick(object sender, EventArgs e)
{ //ADDING TIMER -> LISTBOX 1
for(var i = 0; i < 3; i++,Counter++)
{
if (_tracker.TryAdd(Counter, Counter))
_entries.Add(Counter);
listBox1.Items.Add(string.Format("Adding {0}", Counter));
}
}
private void timer2_Tick_1(object sender, EventArgs e)
{ //LOGGING TIMER -> LIST BOX 3
listBox3.Items.Add(string.Format("Tracker Count : {0} / Entries Count : {1}", _tracker.Count, _entries.Count));
}
private void button1_Click(object sender, EventArgs e)
{ //START BUTTON -> LOGS TO LIST BOX 2
var options = new ParallelOptions {
CancellationToken = _tokenSource.Token,
MaxDegreeOfParallelism = 1
};
_factory.StartNew(() => { Parallel.ForEach(_entries.GetConsumingEnumerable(), options, DoWork); });
timer1.Enabled = timer2.Enabled = true;
timer1.Start();
timer2.Start();
}
private void DoWork(int entry)
{
Thread.Sleep(1000); //Sleep for 1 second to simulate work being done.
Invoke((MethodInvoker)(() => listBox2.Items.Add(string.Format("Processed {0}", entry))));
int oldEntry;
_tracker.TryRemove(entry, out oldEntry);
}
private void button2_Click(object sender, EventArgs e)
{ //STOP BUTTON
timer1.Stop();
timer1.Enabled = false;
}
以下是事件序列:
- 按开始
- 定时器1蜱&ListBox1中立即用3个的消息来更新(添加0,1,2)
- ListBox2被随后更新与3条消息,间隔1秒
- 加工0
- 处理1
- 处理2
- 定时器1蜱&ListBox1中立即用3个的消息来更新(添加3,4,5)
- ListBox2被sbsequent与2条消息,间隔1秒更新
- 处理3
- 处理4
- 处理5不打印......似乎已经“失踪”
- 按停止,以避免更多的消息通过定时器1被添加
- 等待...“处理5”仍然没有出现
你可以看到,并发字典仍然追踪1项尚未处理和随后从去除_tracker
如果我再按开始,然后开始TIMER1添加更多的3个条目和并行循环起死回生打印5,6,7和8。
我在一个完全丧失,为什么发生这种情况。 调用再次开始明显调用newtask,它调用一个Paralell的foreach,并重新执行GetConsumingEnumerable(),它奇迹般地找到了丢失的入口......我
为什么BlockingCollection.GetConsumingEnumerable()
不能保证到了加的集合中的每个项目迭代。
为什么添加更多的条目随后使其得到“脱胶”,并继续它的处理?