-->

Passing async method into Parallel.ForEach

2020-02-13 04:36发布

问题:

I was reading this post about Parallel.ForEach where it was stated that "Parallel.ForEach is not compatible with passing in a async method."

So, to check I write this code:

static async Task Main(string[] args)
{
    var results = new ConcurrentDictionary<string, int>();

    Parallel.ForEach(Enumerable.Range(0, 100), async index =>
    {
        var res = await DoAsyncJob(index);
        results.TryAdd(index.ToString(), res);
    });         

    Console.ReadLine();
}

static async Task<int> DoAsyncJob(int i)
{
    Thread.Sleep(100);
    return await Task.FromResult(i * 10);
}

This code fills in the results dictionary concurrently.

By the way, I created a dictionary of type ConcurrentDictionary<string, int> because in case I have ConcurrentDictionary<int, int> when I explore its elements in debug mode I see that elements are sorted by the key and I thought that elenents was added consequently.

So, I want to know is my code is valid? If it "is not compatible with passing in a async method" why it works well?

回答1:

This code works only because DoAsyncJob isn't really an asynchronous method. async doesn't make a method work asynchronously. Awaiting a completed task like that returned by Task.FromResult is synchronous too. async Task Main doesn't contain any asynchronous code, which results in a compiler warning.

An example that demonstrates how Parallel.ForEach doesn't work with asynchronous methods should call a real asynchronous method:

    static async Task Main(string[] args)
    {
        var results = new ConcurrentDictionary<string, int>();

        Parallel.ForEach(Enumerable.Range(0, 100), async index =>
        {
            var res = await DoAsyncJob(index);
            results.TryAdd(index.ToString(), res);
        });  
        Console.WriteLine($"Items in dictionary {results.Count}");
    }

    static async Task<int> DoAsyncJob(int i)
    {
        await Task.Delay(100);
        return i * 10;
    }

The result will be

Items in dictionary 0

Parallel.ForEach has no overload accepting a Func<Task>, it accepts only Action delegates. This means it can't await any asynchronous operations.

async index is accepted because it's implicitly an async void delegate. As far as Parallel.ForEach is concerned, it's just an Action<int>.

The result is that Parallel.ForEach fires off 100 tasks and never waits for them to complete. That's why the dictionary is still empty when the application terminates.



回答2:

An async method is one that starts and returns a Task.

Your code here

Parallel.ForEach(Enumerable.Range(0, 100), async index =>
{
    var res = await DoAsyncJob(index);
    results.TryAdd(index.ToString(), res);
});        

runs async methods 100 times in parallel. That's to say it parallelises the task creation, not the whole task. By the time ForEach has returned, your tasks are running but they are not necessarily complete.

You code works because DoAsyncJob() not actually asynchronous - your Task is completed upon return. Thread.Sleep() is a synchronous method. Task.Delay() is its asynchronous equivalent.

Understand the difference between CPU-bound and I/O-bound operations. As others have already pointed out, parallelism (and Parallel.ForEach) is for CPU-bound operations and asynchronous programming is not appropriate.



回答3:

If you already have asynchronous work, you don't need Parallel.ForEach:

static async Task Main(string[] args)
{

    var results = await new Task.WhenAll(
        Enumerable.Range(0, 100)
        Select(i => DoAsyncJob(I)));

    Console.ReadLine();
}

Regarding your async job, you either go async all the way:

static async Task<int> DoAsyncJob(int i)
{
    await Task.Delay(100);
    return await Task.FromResult(i * 10);
}

Better yet:

static async Task<int> DoAsyncJob(int i)
{
    await Task.Delay(100);
    return i * 10;
}

or not at all:

static Task<int> DoAsyncJob(int i)
{
    Thread.Sleep(100);
    return Task.FromResult(i * 10);
}