当我在我的程序中使用Parallel.ForEach,我发现某些线程似乎从来没有完成。 事实上,它不停地一遍又一遍产生新的线程,我没想到一个行为,绝对不想。
我能够再现与下面的代码,就像我的“真实”的节目,无论是使用处理器和内存很多(.NET 4.0的代码)这个行为:
public class Node
{
public Node Previous { get; private set; }
public Node(Node previous)
{
Previous = previous;
}
}
public class Program
{
public static void Main(string[] args)
{
DateTime startMoment = DateTime.Now;
int concurrentThreads = 0;
var jobs = Enumerable.Range(0, 2000);
Parallel.ForEach(jobs, delegate(int jobNr)
{
Interlocked.Increment(ref concurrentThreads);
int heavyness = jobNr % 9;
//Give the processor and the garbage collector something to do...
List<Node> nodes = new List<Node>();
Node current = null;
for (int y = 0; y < 1024 * 1024 * heavyness; y++)
{
current = new Node(current);
nodes.Add(current);
}
TimeSpan elapsed = DateTime.Now - startMoment;
int threadsRemaining = Interlocked.Decrement(ref concurrentThreads);
Console.WriteLine("[{0:mm\\:ss}] Job {1,4} complete. {2} threads remaining.", elapsed, jobNr, threadsRemaining);
});
}
}
当我四核运行,它最初开始的4个并发线程,就像你期望的那样。 然而,随着时间的推移正在创造越来越多的线程。 最终,这个节目则抛出一个OutOfMemoryException:
[00:00] Job 0 complete. 3 threads remaining.
[00:01] Job 1 complete. 4 threads remaining.
[00:01] Job 2 complete. 4 threads remaining.
[00:02] Job 3 complete. 4 threads remaining.
[00:05] Job 9 complete. 5 threads remaining.
[00:05] Job 4 complete. 5 threads remaining.
[00:05] Job 5 complete. 5 threads remaining.
[00:05] Job 10 complete. 5 threads remaining.
[00:08] Job 11 complete. 5 threads remaining.
[00:08] Job 6 complete. 5 threads remaining.
...
[00:55] Job 67 complete. 7 threads remaining.
[00:56] Job 81 complete. 8 threads remaining.
...
[01:54] Job 107 complete. 11 threads remaining.
[02:00] Job 121 complete. 12 threads remaining.
..
[02:55] Job 115 complete. 19 threads remaining.
[03:02] Job 166 complete. 21 threads remaining.
...
[03:41] Job 113 complete. 28 threads remaining.
<OutOfMemoryException>
对于上述实验中的内存使用情况图表如下所示:
( 截图是在荷兰,顶部部分代表处理器使用,底部的内存使用。)正如你所看到的,它看起来像一个新的线程几乎每一个垃圾收集器的方式获取(如可以看到时间被催生在存储器使用的倾角)。
谁能解释为什么发生这种情况,并且我能做些什么呢? 我只想.NET停止产生新的线程,先完成现有线程...
您可以限制得到通过指定创建的线程的最大数量ParallelOptions
与实例MaxDegreeOfParallelism
属性设置:
var jobs = Enumerable.Range(0, 2000);
ParallelOptions po = new ParallelOptions
{
MaxDegreeOfParallelism = Environment.ProcessorCount
};
Parallel.ForEach(jobs, po, jobNr =>
{
// ...
});
至于为什么你得到你观察行为:TPL(这underlies PLINQ),默认情况下,随意猜测线程使用的最佳数目。 每当一个并行任务块,任务调度会,以保持进步创建一个新的线程。 在你的情况下,阻止可能发生的隐式; 例如,通过Console.WriteLine
呼叫,或(如你观察到的)中的垃圾收集。
从并发级别与任务并行库调整(多少线程使用?) :
由于TPL默认的策略是使用每个处理器一个线程,我们可以得出结论,TPL最初假定任务的工作量〜100%的工作和0%的等待,如果最初的假设不成立,任务进入等待状态(即开始封锁) - TPL与冒昧地添加线程合适。
你或许应该阅读一些有关的任务调度是如何工作的。
http://msdn.microsoft.com/en-us/library/ff963549.aspx (后者的页面的一半)
“在.NET线程池自动管理工作者线程数池中它添加和删除线程根据内置启发式的.NET线程池具有注入线程两种主要机制:。,增加了工人饥饿避免机制线程,如果它认为没有进步正在对排队的项目,并试图在使用尽可能少的线程尽可能地最大化吞吐量爬山启发式制作。
饥饿避税的目的是为了防止死锁。 当工人线程等待,只能由这仍然是在线程池中的全球或本地队列挂起工作项目应满足的同步事件,可能会发生这种僵局。 如果有工作线程固定数量,以及所有那些线程类似地封闭,系统就无法不断取得新的进展。 添加一个新的工作线程可以解决该问题。
爬山启发式的一个目标是改善核心的利用率当线程是由停止处理器的I / O或其它等待条件封锁。 默认情况下,托管线程池每个核心一个工作线程。 如果这些工作线程被阻塞,有一个机会,一个核心可以根据电脑的整体工作量不能充分利用。 线程注入逻辑不该真实阻塞线程而这执行冗长,处理器密集型操作的线程之间进行区分。 因此,每当线程池的全局或局部队列包含挂起的工作项目,这需要很长的时间来运行(超过半秒以上)活动工作项可以触发新的线程池的工作线程的创建“。
您可以任务标记为LongRunning但这从线程池,这意味着任务不能被内联外分配它一个线程的副作用。
请记住,ParallelFor会将其给出块,这样即使在一个循环的工作是相当小的,通过外观调用的任务做了全面工作,可能会出现更长的时间来调度工作。
拨打到在他们的自我和GC不会阻止(它运行在一个单独的线程),但如果您等待GC完成那么这确实块。 也请记住,在GC被重新排列内存中,因此这可能有一些副作用(和阻塞),如果你想在运行GC分配内存。 我没有具体细节在这里,但我知道PPL有一定的内存分配功能,专为并行内存管理这个原因。
看你的代码的输出似乎事情是许多秒运行。 所以,我并不感到惊讶,你看到线程注入。 不过,我记得好象默认的线程池的大小大约是30个线程(可能取决于内核的系统上的数字)。 一个线程占用了大约一个MB内存之前,你的代码分配任何更多,所以我不明白为什么你可以在这里得到一个内存不足的异常。
我已经发布了后续问题“如何计算在.NET应用程序的并发线程数量?”
如果计算线程直接,他们的Parallel.For号()主要是((很少和不显着减少)仅增加和循环完成后不releleased。
在发布和调试模式经过这一点,与
ParallelOptions po = new ParallelOptions
{
MaxDegreeOfParallelism = Environment.ProcessorCount
};
而不
该数字有所不同,但结论是相同的。
下面是我用的是现成的代码,如果有人想一起玩:
using System;
using System.Collections.Generic;
using System.Diagnostics;
using System.Linq;
using System.Threading;
using System.Threading.Tasks;
namespace Edit4Posting
{
public class Node
{
public Node Previous { get; private set; }
public Node(Node previous)
{
Previous = previous;
}
}
public class Edit4Posting
{
public static void Main(string[] args)
{
int concurrentThreads = 0;
int directThreadsCount = 0;
int diagThreadCount = 0;
var jobs = Enumerable.Range(0, 160);
ParallelOptions po = new ParallelOptions
{
MaxDegreeOfParallelism = Environment.ProcessorCount
};
Parallel.ForEach(jobs, po, delegate(int jobNr)
//Parallel.ForEach(jobs, delegate(int jobNr)
{
int threadsRemaining = Interlocked.Increment(ref concurrentThreads);
int heavyness = jobNr % 9;
//Give the processor and the garbage collector something to do...
List<Node> nodes = new List<Node>();
Node current = null;
//for (int y = 0; y < 1024 * 1024 * heavyness; y++)
for (int y = 0; y < 1024 * 24 * heavyness; y++)
{
current = new Node(current);
nodes.Add(current);
}
//*******************************
directThreadsCount = Process.GetCurrentProcess().Threads.Count;
//*******************************
threadsRemaining = Interlocked.Decrement(ref concurrentThreads);
Console.WriteLine("[Job {0} complete. {1} threads remaining but directThreadsCount == {2}",
jobNr, threadsRemaining, directThreadsCount);
});
Console.WriteLine("FINISHED");
Console.ReadLine();
}
}
}