我在写一些代码来处理大量的数据,我(至少我)认为这将是非常有用的Parallel.ForEach创建为它创建这样的输出并不需要同步的每个线程的文件。
它看起来是这样的:
Parallel.ForEach(vals,
new ParallelOptions { MaxDegreeOfParallelism = 8 },
()=>GetWriter(), // returns a new BinaryWriter backed by a file with a guid name
(item, state, writer)=>
{
if(something)
{
state.Break();
return writer;
}
List<Result> results = new List<Result>();
foreach(var subItem in item.SubItems)
results.Add(ProcessItem(subItem));
if(results.Count > 0)
{
foreach(var result in results)
result.Write(writer);
}
return writer;
},
(writer)=>writer.Dispose());
我希望发生的是,多达8个文件将被创建,并会在整个运行时间持续。 然后,当整个的ForEach呼叫完成每个将设置。 真正发生的是,localInit似乎对每个项目进行一次调用,所以我结束了数百个文件。 作家也越来越设置在被处理的每个项目的结束。
这表明发生了同样的事情:
var vals = Enumerable.Range(0, 10000000).ToArray();
long sum = 0;
Parallel.ForEach(vals,
new ParallelOptions { MaxDegreeOfParallelism = 8 },
() => { Console.WriteLine("init " + Thread.CurrentThread.ManagedThreadId); return 0L; },
(i, state, common) =>
{
Thread.Sleep(10);
return common + i;
},
(common) => Interlocked.Add(ref sum, common));
我知道了:
init 10
init 14
init 11
init 13
init 12
init 14
init 11
init 12
init 13
init 11
... // hundreds of lines over < 30 seconds
init 14
init 11
init 18
init 17
init 10
init 11
init 14
init 11
init 14
init 11
init 18
注:如果我离开了Thread.sleep代码调用,它有时似乎“正常”功能。 localInit只被调用一次,每次为它决定在我的电脑使用4个线程。 并不是每一个时间,但是。
这是函数的期望行为? 这是怎么回事,导致它做幕后? 最后,有什么好办法让我的期望的功能,ThreadLocal的?
这是在.NET 4.5,顺便说一句。
Parallel.ForEach
因为你认为它不工作。 需要注意的是该方法是建立在顶部是非常重要的Task
类和关系的Task
和Thread
不是1:1。 你可以有,例如,在2个托管线程运行10个任务。
尝试在你的方法体,而不是当前使用该行:
Console.WriteLine("ThreadId {0} -- TaskId {1} ",
Thread.CurrentThread.ManagedThreadId, Task.CurrentId);
你应该看到ThreadId
会在许多不同的任务可以重复使用,通过他们的唯一ID所示。 你会看到这个更多,如果你留在,或增加,你调用Thread.Sleep
。
的如何的(非常)基本思路Parallel.ForEach
方法的工作原理,是它需要你的枚举创建了一系列将运行枚举进程的部分任务,这样做的方式依赖于输入了很多。 还有一个检查超过一定的毫秒数没有完成任务的情况下,一些特殊的逻辑。 如果情况属实,那么一个新的任务可能催生以帮助缓解工作。
如果你看的文档localinit
在功能Parallel.ForEach
,你会发现,它说,它returns the initial state of the local data for each _task_
,不是每个线程 。
你可能会问,为什么有正在催生超过8个任务。 这个问题的答案是类似上,对于文档中找到ParallelOptions.MaxDegreeOfParallelism
。
更改MaxDegreeOfParallelism
从默认的唯一限制很多并发任务将被如何使用。
此限制仅适用于并发任务的数量上,而不是将在它处理的整个时间被创建的任务数的硬性限制。 正如我上面提到的,有次在那里一个独立的任务将被催生,这将导致你的localinit
函数被调用多次,写几百个文件的磁盘。
写入磁盘肯定是有一点延迟的操作,特别是如果你使用同步I / O。 当磁盘操作发生,它会阻止整个主题; 同样的情况与Thread.Sleep
。 如果一个Task
做到这一点,它会阻止它当前正在运行的线程,并没有其他任务可以在其上运行。 通常在这种情况下,调度将产生一个新的Task
,以帮助收拾残局。
最后,有什么好办法让我的期望的功能,ThreadLocal的?
底线是线程局部变量没有任何意义与Parallel.ForEach
因为你不处理线程; 你处理的任务。 一个线程局部可能任务间共享,因为许多任务都可以在同一时间使用相同的线程。 另外,任务的线程局部可能改变中期执行,因为调度程序能够运行抢占它,然后继续在不同的线程,这将有一个不同的线程本地执行。
我不知道这样做的最佳方式,但你可以依靠localinit
功能在你喜欢的任何资源通过,只允许将一个线程在同一时间使用的资源。 您可以使用localfinally
来标记它在使用时不再,因而可用于其它任务获取。 这就是这些方法旨在为; 每一次任务是催生了每个方法只调用(见的备注部分Parallel.ForEach
MSDN文档)。
你也可以自己拆的工作,并创建自己的组线程和运行工作。 然而,这是少的想法,在我看来,由于Parallel
类已经这样做了繁重的你。
你看到的是试图让你的工作尽快完成落实。
要做到这一点,它会尝试使用任务的不同数量的最大化吞吐量。 它抓住线程池中一定数量的线程并运行你的一些工作。 然后,它会尝试添加和删除线程来看看会发生什么。 直到所有的工作完成后,它将继续这样做。
该算法是,如果你的工作是使用大量的CPU,或者大量的IO,或者即使有大量的同步和线程互相阻挡它不知道很愚蠢的。 它所能做的就是添加和删除线程和测量工作的各单位如何快速完成。
这意味着它是不断地打电话给你localInit
和localFinally
功能,因为它注入和退休线程-这是你有什么发现。
不幸的是,控制这种算法没有简单的方法。 Parallel.ForEach
是一个高层次的构建体,其有意地隐藏了的线程管理代码。
使用ThreadLocal
可能有点帮助,但是它依赖于一个事实,即线程池,将重复使用相同的线程时Parallel.ForEach
要求换新。 这是不保证的广告 - 事实上,这是不可能的线程池将使用完全相同8线程整个呼叫。 这意味着你将再次超过必要创造更多的文件。
这是保证一件事是, Parallel.ForEach
永远不会使用超过MaxDegreeOfParallelism
线程在任何一个时间。
您可以通过创建可重复使用的任何一个线程在特定时间运行的文件的固定大小的“池子”使用你的优势。 你知道,只有MaxDegreeOfParallelism
线程可以同时运行,所以你可以在调用之前创建的文件数ForEach
。 然后抓住一个在你localInit
并在释放localFinally
。
当然,你将不得不自己写这池,它必须是线程安全的,因为它会被同时调用。 一个简单的锁定策略就足够好了,但因为线程相比,锁的成本不注入并很快退休。
根据MSDN的localInit
方法为每个任务调用一次,不是为每个线程:
该localInit委托为参与循环的执行,并返回初始本地状态为每个这些任务每个任务调用一次。
线程创建时localInit调用。 如果身体需要很长时间,必须创建另一个线程并挂起当前线程,如果它创建另一个线程,它调用localInit
此外,当Parallel.ForEach人称其为多创建线程,例如MaxDegreeOfParallelism值:
var k = Enumerable.Range(0, 1);
Parallel.ForEach(k,new ParallelOptions(){MaxDegreeOfParallelism = 4}.....
它创建四线程时,首先它叫
文章来源: Why does the localInit Func get called multiple times per thread in Parallel.ForEach