如何与并行的ForEach当地的初始化工作?(How does local initializati

2019-08-16 22:29发布

我不确定使用在Parallel.ForEach当地的初始化函数,因为在MSDN文章中描述: http://msdn.microsoft.com/en-us/library/dd997393.aspx

Parallel.ForEach<int, long>(nums, // source collection
   () => 0, // method to initialize the local variable
   (j, loop, subtotal) => // method invoked by the loop on each iteration
   {
      subtotal += nums[j]; //modify local variable 
      return subtotal; // value to be passed to next iteration
   },...

怎样()=> 0初始化什么? 什么是变量的名字,我怎么可以用它在循环逻辑?

Answer 1:

参照以下过载的的Parallel.ForEach静态扩展方法:

public static ParallelLoopResult ForEach<TSource, TLocal>(
    IEnumerable<TSource> source,
    Func<TLocal> localInit,
    Func<TSource, ParallelLoopState, TLocal, TLocal> taskBody,
    Action<TLocal> localFinally
)

在您的具体实例

该行:

() => 0, // method to initialize the local variable

是一个简单的拉姆达(匿名函数),这将返回该恒定整数零。 此拉姆达被传递作为localInit参数Parallel.ForEach -因为拉姆达返回一个整数,它具有输入Func<int>并键入TLocal可以被推断为int由编译器(类似地, TSource可以从的类型来推断集合作为参数传递source

的返回值(0),然后作为第三参数(名为传递subtotal )至taskBody Func 。 这个(0)用于体循环中的初始种子:

(j, loop, subtotal) =>
{
    subtotal += nums[j]; //modify local variable (Bad idea, see comment)
    return subtotal;     // value to be passed to next iteration
}

这第二拉姆达(传递给taskBody )被称为N次,其中N是由TPL分区分配给该任务的项目数。

到第二每个后续呼叫taskBody拉姆达将传递的新值subTotal ,有效地计算一运行局部总,对于该任务。 分配给该任务的所有物品已被添加之后,第三和最后一个, localFinally函数参数将被调用,再次,路过的终值subtotal从返回taskBody 。 由于几个这样的任务,将并行运行,也将需要将所有部分总数加起来到最后的“盛大”总最后一个步骤。 然而,因为多个并行任务(在不同的线程)可以争夺的grandTotal变量,重要的是改变它在一个线程安全的方式完成。

(我已经改变了MSDN变量的名称,以使其更清晰)

long grandTotal = 0;
Parallel.ForEach(nums,            // source collection
  () => 0,                        // method to initialize the local variable
  (j, loop, subtotal) =>          // method invoked by the loop on each iteration
     subtotal + nums[j],          // value to be passed to next iteration subtotal
  // The final value of subtotal is passed to the localFinally function parameter
  (subtotal) => Interlocked.Add(ref grandTotal, subtotal)

在MS实施例中,任务体内参数小计的修饰是差的实践,和不必要的。 即该码subtotal += nums[j]; return subtotal; subtotal += nums[j]; return subtotal; 会更好,因为刚return subtotal + nums[j]; 其可缩写为拉姆达速记投影(j, loop, subtotal) => subtotal + nums[j]

一般来说

所述localInit / body / localFinally的过载的Parallel.For / Parallel.ForEach允许一次每个任务初始化和清除码被运行之前,以及后(分别)的taskBody迭代由任务来执行。

(注意到对于范围/可枚举传递给平行For / Foreach将被划分成的批次IEnumerable<>其中的每一个将被分配一个任务,)

每个任务localInit将被称为一次, body代码将被重复调用,每件在一次批量( 0..N次), localFinally将一次完成时被调用。

此外,还可以通过对任务(即该期间所需的任何状态taskBodylocalFinally通过一个通用的委托) TLocal从返回值localInit Func -我已经叫这个变量taskLocals以下。

“localInit”的常见用法:

  • 创建和初始化通过循环体需要昂贵的资源,如数据库连接或Web服务连接。
  • 保持任务局部变量来保存(无竞争)运行总计或集合
  • 如果你需要从返回多个对象localInittaskBodylocalFinally ,你可以使用强类型类,一Tuple<,,>或者,如果你只使用lambda表达式为localInit / taskBody / localFinally ,你也可以通过经由一个匿名类的数据。 注意:如果使用从返回localInit共享多个任务之间的引用类型,你将需要考虑这个对象的线程安全性-不变性是优选的。

在“localFinally”行动的常见用法:

  • 为了释放资源,如IDisposables在使用taskLocals (如数据库连接,文件处理,Web服务客户端等)
  • 要聚合/合/减少每个任务完成的工作回到共享变量(一个或多个)。 这些共享变量会争辩,所以线程安全性的担忧:
    • 例如Interlocked.Increment对原始类型,如整数
    • lock或类似措辞将被要求写操作
    • 客人可以使用的并发集合 ,以节省时间和精力。

taskBodytight的循环操作的一部分-你要优化这个性能。

这一切最好的一个注释过的例子总结如下:

public void MyParallelizedMethod()
{
    // Shared variable. Not thread safe
    var itemCount = 0; 

    Parallel.For(myEnumerable, 
    // localInit - called once per Task.
    () => 
    {
       // Local `task` variables have no contention 
       // since each Task can never run by multiple threads concurrently
       var sqlConnection = new SqlConnection("connstring...");
       sqlConnection.Open();

       // This is the `task local` state we wish to carry for the duration of the task
       return new 
       { 
          Conn = sqlConnection,
          RunningTotal = 0
       }
    },
    // Task Body. Invoked once per item in the batch assigned to this task
    (item, loopState, taskLocals) =>
    {
      // ... Do some fancy Sql work here on our task's independent connection
      using(var command = taskLocals.Conn.CreateCommand())
      using(var reader = command.ExecuteReader(...))
      {
        if (reader.Read())
        {
           // No contention for `taskLocal`
           taskLocals.RunningTotal += Convert.ToInt32(reader["countOfItems"]);
        }
      }
      // The same type of our `taskLocal` param must be returned from the body
      return taskLocals;
    },
    // LocalFinally called once per Task after body completes
    // Also takes the taskLocal
    (taskLocals) =>
    {
       // Any cleanup work on our Task Locals (as you would do in a `finally` scope)
       if (taskLocals.Conn != null)
         taskLocals.Conn.Dispose();

       // Do any reduce / aggregate / synchronisation work.
       // NB : There is contention here!
       Interlocked.Add(ref itemCount, taskLocals.RunningTotal);
    }

还有更多的例子:

实施例的每个任务的词典非竞争

实施例的每个任务数据库连接



Answer 2:

作为扩展到@Honza Brestan的答案。 该路并行的foreach拆分工作纳入任务也很重要,它会组几个循环迭代到一个单一的任务,因此在实践中localInit()是循环的每n次迭代调用一次,多组可以同时启动。

一个点localInitlocalFinally是确保并行foreach循环可以组合每个itteration到一个结果的结果,而你需要在指定锁的语句body ,要做到这一点,你必须为你想要的值初始化创建( localInit ),那么每个body itteration可以处理当地的价值,那么你提供值从每个组(结合的方法localFinally )以线程安全的方式。

如果你不需要localInit同步任务,你可以使用拉姆达方法从周围的上下文为正常参考值,没有任何问题。 见线程在C#(的Parallel.For和Parallel.ForEach)深入教程更使用localInit /最后,向下滚动到当地的价值优化 ,约瑟夫阿尔巴哈利真的是我的一切事物线程转到源。



Answer 3:

您可以在获得MSDN上的提示正确Parallel.ForEach超载。

该localInit委托为参与循环的执行,并返回初始本地状态为每个这些任务的每个线程调用一次。 这些初始状态传递给每个任务的第一主体调用。 然后,每个后续主体调用返回传递到下一个主体调用一个可能被修改的状态的值。

在你的例子() => 0是刚好返回代表0 ,所以这个值被用于对各任务的第一次迭代。



Answer 4:

从我的身边多一点点简单的例子

class Program
{
    class Person
    {
        public int Id { get; set; }
        public string Name { get; set; }
        public int Age { get; set; }
    }

    static List<Person> GetPerson() => new List<Person>()
    {
        new Person() { Id = 0, Name = "Artur", Age = 26 },
        new Person() { Id = 1, Name = "Edward", Age = 30 },
        new Person() { Id = 2, Name = "Krzysiek", Age = 67 },
        new Person() { Id = 3, Name = "Piotr", Age = 23 },
        new Person() { Id = 4, Name = "Adam", Age = 11 },
    };

    static void Main(string[] args)
    {
        List<Person> persons = GetPerson();
        int ageTotal = 0;

        Parallel.ForEach
        (
            persons,
            () => 0,
            (person, loopState, subtotal) => subtotal + person.Age,
            (subtotal) => Interlocked.Add(ref ageTotal, subtotal)
        );

        Console.WriteLine($"Age total: {ageTotal}");
        Console.ReadKey();
    }
}


文章来源: How does local initialization with Parallel ForEach work?