I'm trying to process 5000 files in an async manner without growing the Threadpool unrestricted. The Parallel.For
loop however, is not giving me a consistent correct answer (count comes up short), while the Task.Run is.
What am I doing wrong in the Parallel.For
loop that is causing these incorrect answers?
using System;
using System.Collections.Generic;
using System.IO;
using System.Linq;
using System.Threading;
using System.Threading.Tasks;
class Program
{
static volatile int count = 0;
static volatile int total = 0;
static void Main(string[] args)
{
Parallel.For(0, 5000, new ParallelOptions { MaxDegreeOfParallelism = 10 },
async (index) =>
{
string filePath = $"C:\\temp\\files\\out{index}.txt";
var bytes = await ReadFileAsync(filePath);
Interlocked.Add(ref total, bytes.Length);
Interlocked.Increment(ref count);
});
Console.WriteLine(count);
Console.WriteLine(total);
count = 0;
total = 0;
List<Task> tasks = new List<Task>();
foreach (int index in Enumerable.Range(0, 5000))
{
tasks.Add(Task.Run(async () =>
{
string filePath = $"C:\\temp\\files\\out{index}.txt";
var bytes = await ReadFileAsync(filePath);
Interlocked.Add(ref total, bytes.Length);
Interlocked.Increment(ref count);
}));
}
Task.WhenAll(tasks).Wait();
Console.WriteLine(count);
Console.WriteLine(total);
}
public static async Task<byte[]> ReadFileAsync(string filePath)
{
byte[] bytes = new byte[4096];
using (var sourceStream = new FileStream(filePath,
FileMode.Open, FileAccess.Read, FileShare.Read,
bufferSize: 4096, useAsync: true))
{
await sourceStream.ReadAsync(bytes, 0, 4096);
};
return bytes;
}
}
Parallel.For
is notasync
aware.As such, the
Parallel.For
is not performing as you expect. Because the task generated by the async lambda is not waited for, all of the iterations will complete in the time it takes to create the tasks, not complete them.After your
Parallel.For
, a number of iterations will still have a pending task that is not yet complete, and therefore, your additions tocount
andtotal
have not yet completed.Stephen Toub has implemented an async version of Parallel.ForEach. (ForEachAsync) The implementation is as follows:
So you might rewrite your loop: