Parallel.For vs ThreadPool and async/await

2019-07-25 21:16发布

问题:

I'm trying to process 5000 files in an async manner without growing the Threadpool unrestricted. The Parallel.For loop however, is not giving me a consistent correct answer (count comes up short), while the Task.Run is.

What am I doing wrong in the Parallel.For loop that is causing these incorrect answers?

using System;
using System.Collections.Generic;
using System.IO;
using System.Linq;
using System.Threading;
using System.Threading.Tasks;

class Program
{
    static volatile int count = 0;
    static volatile int total = 0;
    static void Main(string[] args)
    {
        Parallel.For(0, 5000, new ParallelOptions { MaxDegreeOfParallelism = 10 },
            async (index) =>
            {
                string filePath = $"C:\\temp\\files\\out{index}.txt";
                var bytes = await ReadFileAsync(filePath);
                Interlocked.Add(ref total, bytes.Length);
                Interlocked.Increment(ref count);
            });
        Console.WriteLine(count);
        Console.WriteLine(total);

        count = 0;
        total = 0;
        List<Task> tasks = new List<Task>();
        foreach (int index in Enumerable.Range(0, 5000))
        {
            tasks.Add(Task.Run(async () =>
            {
                string filePath = $"C:\\temp\\files\\out{index}.txt";
                var bytes = await ReadFileAsync(filePath);
                Interlocked.Add(ref total, bytes.Length);
                Interlocked.Increment(ref count);
            }));
        }
        Task.WhenAll(tasks).Wait();
        Console.WriteLine(count);
        Console.WriteLine(total);
    }
    public static async Task<byte[]> ReadFileAsync(string filePath)
    {
        byte[] bytes = new byte[4096];
        using (var sourceStream = new FileStream(filePath,
                FileMode.Open, FileAccess.Read, FileShare.Read,
                bufferSize: 4096, useAsync: true))
        {
            await sourceStream.ReadAsync(bytes, 0, 4096);
        };
        return bytes;
    }
}

回答1:

Parallel.For is not async aware.

As such, the Parallel.For is not performing as you expect. Because the task generated by the async lambda is not waited for, all of the iterations will complete in the time it takes to create the tasks, not complete them.

After your Parallel.For, a number of iterations will still have a pending task that is not yet complete, and therefore, your additions to count and total have not yet completed.

Stephen Toub has implemented an async version of Parallel.ForEach. (ForEachAsync) The implementation is as follows:

public static Task ForEachAsync<T>(this IEnumerable<T> source, int dop, Func<T, Task> body)
{
    return Task.WhenAll(
        from partition in Partitioner.Create(source).GetPartitions(dop)
        select Task.Run(async delegate {
            using (partition)
                while (partition.MoveNext())
                    await body(partition.Current);
        }));
}

So you might rewrite your loop:

Enumerable.Range(0, 5000).ForEachAsync(10, async (index)=>{
   //$$$
});