我想用.NET迭代器并行任务/等待? 事情是这样的:
IEnumerable<TDst> Foo<TSrc, TDest>(IEnumerable<TSrc> source)
{
Parallel.ForEach(
source,
s=>
{
// Ordering is NOT important
// items can be yielded as soon as they are done
yield return ExecuteOrDownloadSomething(s);
}
}
不幸的是.NET本身不能处理这个问题。 最佳答案迄今为止@svick - 使用进行AsParallel()。
奖金:任何简单异步/等待实现多个出版商和单个用户代码? 订户将产生,而酒吧会处理。 (只有核心库)
这似乎是对PLINQ工作:
return source.AsParallel().Select(s => ExecuteOrDownloadSomething(s));
这将使用线程的数量有限,只要它完成返回每个结果并行执行委托。
如果ExecuteOrDownloadSomething()
方法IO的限制(例如,它实际上是下载的东西),你不想浪费线程,然后使用async
- await
可能是有意义的,但它会更加复杂。
如果你想充分利用async
,你不应该返回IEnumerable
,因为它是同步的(即它的块,如果没有项目可用)。 你需要的是某种异步收集的,并且可以使用ISourceBlock
(具体TransformBlock
从TPL数据流)为:
ISourceBlock<TDst> Foo<TSrc, TDest>(IEnumerable<TSrc> source)
{
var block = new TransformBlock<TSrc, TDest>(
async s => await ExecuteOrDownloadSomethingAsync(s),
new ExecutionDataflowBlockOptions
{
MaxDegreeOfParallelism = DataflowBlockOptions.Unbounded
});
foreach (var item in source)
block.Post(item);
block.Complete();
return block;
}
如果来源是“慢”(即要开始处理从结果中Foo()
迭代前source
完成),你可能想移动foreach
和Complete()
调用到一个单独的Task
。 更好的解决方案将是使source
成ISourceBlock<TSrc>
太。
所以会出现你真正想要做的是基于当他们完成订购任务序列。 这是不是非常复杂的:
public static IEnumerable<Task<T>> Order<T>(this IEnumerable<Task<T>> tasks)
{
var input = tasks.ToList();
var output = input.Select(task => new TaskCompletionSource<T>());
var collection = new BlockingCollection<TaskCompletionSource<T>>();
foreach (var tcs in output)
collection.Add(tcs);
foreach (var task in input)
{
task.ContinueWith(t =>
{
var tcs = collection.Take();
switch (task.Status)
{
case TaskStatus.Canceled:
tcs.TrySetCanceled();
break;
case TaskStatus.Faulted:
tcs.TrySetException(task.Exception.InnerExceptions);
break;
case TaskStatus.RanToCompletion:
tcs.TrySetResult(task.Result);
break;
}
}
, CancellationToken.None
, TaskContinuationOptions.ExecuteSynchronously
, TaskScheduler.Default);
}
return output.Select(tcs => tcs.Task);
}
所以在这里我们创建了一个TaskCompletionSource
为每个输入任务,然后通过每个任务和设置从一抓起下一完成源延续BlockingCollection
并设置它的结果。 完成第一个任务是抓取返回的第一个TCS,第二个任务完成获取返回,等第二TCS。
现在,你的代码变得很简单:
var tasks = collection.Select(item => LongRunningOperationThatReturnsTask(item))
.Order();
foreach(var task in tasks)
{
var result = task.Result;//or you could `await` each result
//....
}
在由MS机器人团队提出的异步库,他们不得不并发原语,其允许使用迭代以产生异步代码。
库(CCR)是免费的(它没有使用是免费的)。 一个很好的介绍性的文章可以在这里找到: 并发事务
也许你可以使用这个库旁边的.Net任务库,或者它会激发你“滚你自己”