-->

Is it OK to do some async/await inside some .NET P

2019-03-16 06:32发布

问题:

Given the following code, is it OK to do async/await inside a Parallel.ForEach ?

eg.

Parallel.ForEach(names, name =>
{
    // Do some stuff...

    var foo = await GetStuffFrom3rdPartyAsync(name);

    // Do some more stuff, with the foo.
});

or is there some gotcha's that I need to be made aware of?

EDIT: No idea if this compiles, btw. Just Pseduo-code .. thinking out loud.

回答1:

No, It doesn't make sense to combine async with Paralell.Foreach.

Consider the following example:

private void DoSomething()
{
    var names = Enumerable.Range(0,10).Select(x=> "Somename" + x);
    Parallel.ForEach(names, async(name) =>
    {   
        await Task.Delay(1000);
        Console.WriteLine("Name {0} completed",name);
    });
    Console.WriteLine("Parallel ForEach completed");
}

What output you will expect?

Name Somename3 completed
Name Somename8 completed
Name Somename4 completed
...
Parallel ForEach completed

That's not what will happen. It will output :

Parallel ForEach completed
Name Somename3 completed
Name Somename8 completed
Name Somename4 completed
...

Why? Because when ForEach hits first await the method actually returns, Parallel.ForEach doesn't know it is asynchronous and it ran to completion!. Code after await runs as continuation on another thread not "Paralell processing thread"

Stephen toub addressed this here



回答2:

A close alternative might be this:

static void ForEach<T>(IEnumerable<T> data, Func<T, Task> func)
{
    var tasks = data.Select(item => 
        Task.Run(() => func(item)));

    Task.WaitAll(tasks.ToArray());
}

// ... 

ForEach(names, name => GetStuffFrom3rdPartyAsync(name));

Ideally, you shouldn't be using a blocking call like Task.WaitAll, if you can make the whole chain of methods calls async, "all the way down" on the current call stack:

var tasks = data.Select(item => 
    Task.Run(() => func(item)));

await Task.WhenAll(tasks.ToArray());

Furthermore, if you don't do any CPU-bound work inside GetStuffFrom3rdPartyAsync, Task.Run may be redundant:

var tasks = data.Select(item => func(item));


回答3:

From the name, I'm assuming that GetStuffFrom3rdPartyAsync is I/O-bound. The Parallel class is specifically for CPU-bound code.

In the asynchronous world, you can start multiple tasks and then (asynchronously) wait for them all to complete using Task.WhenAll. Since you're starting with a sequence, it's probably easiest to project each element to an asynchronous operation, and then await all of those operations:

await Task.WhenAll(names.Select(async name =>
{
  // Do some stuff...
  var foo = await GetStuffFrom3rdPartyAsync(name);
  // Do some more stuff, with the foo.
}));


回答4:

As pointed out by @Sriram Sakthivel there are some problems with using Parallel.ForEach with asynchronous lambdas. Steven Toub's ForEachASync can do the equivalent. He talks about it here, but here is the code:

public static class Extensions
{
    public static Task ForEachAsync<T>(this IEnumerable<T> source, int dop, Func<T, Task> body)
    {
        return Task.WhenAll(
            from partition in Partitioner.Create(source).GetPartitions(dop)
            select Task.Run(async delegate {
                                               using (partition) while (partition.MoveNext()) await body(partition.Current);
            }));
    }
}

It uses the Partitioner class to create a load balancing partitioner(doco), and allows you to specify how many threads you want to run with the dop parameter. to see the difference between it and Parallel.ForEach. Try the following code.

 class Program
    {
        public static async Task GetStuffParallelForEach()
        {
            var data = Enumerable.Range(1, 10);
            Parallel.ForEach(data, async i =>
            {
                await Task.Delay(1000 * i);
                Console.WriteLine(i);
            });
        }

        public static async Task GetStuffForEachAsync()
        {
            var data = Enumerable.Range(1, 10);
            await data.ForEachAsync(5, async i =>
            {
                await Task.Delay(1000 * i);
                Console.WriteLine(i);
            });

        }

        static void Main(string[] args)
        {
            //GetStuffParallelForEach().Wait(); // Finished printed before work is complete
            GetStuffForEachAsync().Wait(); // Finished printed after all work is done
            Console.WriteLine("Finished");
            Console.ReadLine();
        }

if you run GetStuffForEachAsync the program waits for all work to finish. If you run GetStuffParallelForEach, the line Finished will be printed before the work is finished.