In a metro app, I need to execute a number of WCF calls. There are a significant number of calls to be made, so I need to do them in a parallel loop. The problem is that the parallel loop exits before the WCF calls are all complete.
How would you refactor this to work as expected?
var ids = new List<string>() { "1", "2", "3", "4", "5", "6", "7", "8", "9", "10" };
var customers = new System.Collections.Concurrent.BlockingCollection<Customer>();
Parallel.ForEach(ids, async i =>
{
ICustomerRepo repo = new CustomerRepo();
var cust = await repo.GetCustomer(i);
customers.Add(cust);
});
foreach ( var customer in customers )
{
Console.WriteLine(customer.ID);
}
Console.ReadKey();
Wrap the
Parallel.Foreach
into aTask.Run()
and instead of theawait
keyword use[yourasyncmethod].Result
(you need to do the Task.Run thing to not block the UI thread)
Something like this:
You can save effort with the new AsyncEnumerator NuGet Package, which didn't exist 4 years ago when the question was originally posted. It allows you to control the degree of parallelism:
Disclaimer: I'm the author of the AsyncEnumerator library, which is open source and licensed under MIT, and I'm posting this message just to help the community.
The whole idea behind
Parallel.ForEach()
is that you have a set of threads and each thread processes part of the collection. As you noticed, this doesn't work withasync
-await
, where you want to release the thread for the duration of the async call.You could “fix” that by blocking the
ForEach()
threads, but that defeats the whole point ofasync
-await
.What you could do is to use TPL Dataflow instead of
Parallel.ForEach()
, which supports asynchronousTask
s well.Specifically, your code could be written using a
TransformBlock
that transforms each id into aCustomer
using theasync
lambda. This block can be configured to execute in parallel. You would link that block to anActionBlock
that writes eachCustomer
to the console. After you set up the block network, you canPost()
each id to theTransformBlock
.In code:
Although you probably want to limit the parallelism of the
TransformBlock
to some small constant. Also, you could limit the capacity of theTransformBlock
and add the items to it asynchronously usingSendAsync()
, for example if the collection is too big.As an added benefit when compared to your code (if it worked) is that the writing will start as soon as a single item is finished, and not wait until all of the processing is finished.
An extension method for this which makes use of SemaphoreSlim and also allows to set maximum degree of parallelism
Sample Usage:
I am a little late to party but you may want to consider using GetAwaiter.GetResult() to run your async code in sync context but as paralled as below;
After introducing a bunch of helper methods, you will be able run parallel queries with this simple sintax:
What happens here is we split source collection into 10 chunks (
.Split(DegreeOfParallelism)
), then run 10 tasks each processing its items one by one (.SelectManyAsync(...)
) and merge those back into a single list.Worth mentioning there is a simpler approach:
But it needs a precaution: if you have a source collection that is too big, it will chedule a
Task
for every item right away, which may cause significant performance hits.Extension methods used in examples above look as follows: