我想在并行处理的集合,而是我在实施它的麻烦,所以我希望一些帮助。
麻烦的出现,如果我想调用异步标记在C#中的方法,并行循环的拉姆达内。 例如:
var bag = new ConcurrentBag<object>();
Parallel.ForEach(myCollection, async item =>
{
// some pre stuff
var response = await GetData(item);
bag.Add(response);
// some post stuff
}
var count = bag.Count;
与计数为0时出现问题,因为创建的所有线程都有效只是后台线程和Parallel.ForEach
调用不会等待完成。 如果我删除async关键字,方法如下:
var bag = new ConcurrentBag<object>();
Parallel.ForEach(myCollection, item =>
{
// some pre stuff
var responseTask = await GetData(item);
responseTask.Wait();
var response = responseTask.Result;
bag.Add(response);
// some post stuff
}
var count = bag.Count;
它的工作原理,但它完全禁用等待聪明,我必须做一些手工异常处理。(除去简洁)。
我怎样才能实现Parallel.ForEach
循环,使用该拉姆达内的await关键字? 可能吗?
该Parallel.ForEach方法的原型接受一个Action<T>
的参数,但我想它等待我的异步拉姆达。
如果你只是想简单的并行性,你可以这样做:
var bag = new ConcurrentBag<object>();
var tasks = myCollection.Select(async item =>
{
// some pre stuff
var response = await GetData(item);
bag.Add(response);
// some post stuff
});
await Task.WhenAll(tasks);
var count = bag.Count;
如果你需要更复杂的东西,看看斯蒂芬Toub的ForEachAsync
岗位 。
您可以使用ParallelForEachAsync
从扩展方法AsyncEnumerator NuGet包 :
using System.Collections.Async;
var bag = new ConcurrentBag<object>();
await myCollection.ParallelForEachAsync(async item =>
{
// some pre stuff
var response = await GetData(item);
bag.Add(response);
// some post stuff
}, maxDegreeOfParallelism: 10);
var count = bag.Count;
我创建了这个扩展方法,其利用SemaphoreSlim的,也可以设置并行度最高
/// <summary>
/// Concurrently Executes async actions for each item of <see cref="IEnumerable<typeparamref name="T"/>
/// </summary>
/// <typeparam name="T">Type of IEnumerable</typeparam>
/// <param name="enumerable">instance of <see cref="IEnumerable<typeparamref name="T"/>"/></param>
/// <param name="action">an async <see cref="Action" /> to execute</param>
/// <param name="maxDegreeOfParallelism">Optional, An integer that represents the maximum degree of parallelism,
/// Must be grater than 0</param>
/// <returns>A Task representing an async operation</returns>
/// <exception cref="ArgumentOutOfRangeException">If the maxActionsToRunInParallel is less than 1</exception>
public static async Task ForEachAsyncConcurrent<T>(
this IEnumerable<T> enumerable,
Func<T, Task> action,
int? maxDegreeOfParallelism = null)
{
if (maxDegreeOfParallelism.HasValue)
{
using (var semaphoreSlim = new SemaphoreSlim(
maxDegreeOfParallelism.Value, maxDegreeOfParallelism.Value))
{
var tasksWithThrottler = new List<Task>();
foreach (var item in enumerable)
{
// Increment the number of currently running tasks and wait if they are more than limit.
await semaphoreSlim.WaitAsync();
tasksWithThrottler.Add(Task.Run(async () =>
{
await action(item).ContinueWith(res =>
{
// action is completed, so decrement the number of currently running tasks
semaphoreSlim.Release();
});
}));
}
// Wait for all tasks to complete.
await Task.WhenAll(tasksWithThrottler.ToArray());
}
}
else
{
await Task.WhenAll(enumerable.Select(item => action(item)));
}
}
用法示例:
await enumerable.ForEachAsyncConcurrent(
async item =>
{
await SomeAsyncMethod(item);
},
5);
我的轻量级实现ParallelForEach异步的。
特征:
- 节流(最大并行度)。
- 异常处理(聚集例外将在完成抛出)。
- 内存高效(无需存储任务列表)。
public static class AsyncEx
{
public static async Task ParallelForEachAsync<T>(this IEnumerable<T> source, Func<T, Task> asyncAction, int maxDegreeOfParallelism = 10)
{
var semaphoreSlim = new SemaphoreSlim(maxDegreeOfParallelism);
var tcs = new TaskCompletionSource<object>();
var exceptions = new ConcurrentBag<Exception>();
bool addingCompleted = false;
foreach (T item in source)
{
await semaphoreSlim.WaitAsync();
asyncAction(item).ContinueWith(t =>
{
semaphoreSlim.Release();
if (t.Exception != null)
{
exceptions.Add(t.Exception);
}
if (Volatile.Read(ref addingCompleted) && semaphoreSlim.CurrentCount == maxDegreeOfParallelism)
{
tcs.SetResult(null);
}
});
}
Volatile.Write(ref addingCompleted, true);
await tcs.Task;
if (exceptions.Count > 0)
{
throw new AggregateException(exceptions);
}
}
}
用法示例:
await Enumerable.Range(1, 10000).ParallelForEachAsync(async (i) =>
{
var data = await GetData(i);
}, maxDegreeOfParallelism: 100);