如何从并行任务产生在.NET 4.5(How to yield from parallel task

2019-07-19 17:16发布

我想用.NET迭代器并行任务/等待? 事情是这样的:

IEnumerable<TDst> Foo<TSrc, TDest>(IEnumerable<TSrc> source)
{
    Parallel.ForEach(
        source,
        s=>
        {
            // Ordering is NOT important
            // items can be yielded as soon as they are done                
            yield return ExecuteOrDownloadSomething(s);
        }
}

不幸的是.NET本身不能处理这个问题。 最佳答案迄今为止@svick - 使用进行AsParallel()。

奖金:任何简单异步/等待实现多个出版商和单个用户代码? 订户将产生,而酒吧会处理。 (只有核心库)

Answer 1:

这似乎是对PLINQ工作:

return source.AsParallel().Select(s => ExecuteOrDownloadSomething(s));

这将使用线程的数量有限,只要它完成返回每个结果并行执行委托。

如果ExecuteOrDownloadSomething()方法IO的限制(例如,它实际上是下载的东西),你不想浪费线程,然后使用async - await可能是有意义的,但它会更加复杂。

如果你想充分利用async ,你不应该返回IEnumerable ,因为它是同步的(即它的块,如果没有项目可用)。 你需要的是某种异步收集的,并且可以使用ISourceBlock (具体TransformBlock从TPL数据流)为:

ISourceBlock<TDst> Foo<TSrc, TDest>(IEnumerable<TSrc> source)
{
    var block = new TransformBlock<TSrc, TDest>(
        async s => await ExecuteOrDownloadSomethingAsync(s),
        new ExecutionDataflowBlockOptions
        {
            MaxDegreeOfParallelism = DataflowBlockOptions.Unbounded
        });

    foreach (var item in source)
        block.Post(item);

    block.Complete();

    return block;
}

如果来源是“慢”(即要开始处理从结果中Foo()迭代前source完成),你可能想移动foreachComplete()调用到一个单独的Task 。 更好的解决方案将是使sourceISourceBlock<TSrc>太。



Answer 2:

所以会出现你真正想要做的是基于当他们完成订购任务序列。 这是不是非常复杂的:

public static IEnumerable<Task<T>> Order<T>(this IEnumerable<Task<T>> tasks)
{
    var input = tasks.ToList();

    var output = input.Select(task => new TaskCompletionSource<T>());
    var collection = new BlockingCollection<TaskCompletionSource<T>>();
    foreach (var tcs in output)
        collection.Add(tcs);

    foreach (var task in input)
    {
        task.ContinueWith(t =>
        {
            var tcs = collection.Take();
            switch (task.Status)
            {
                case TaskStatus.Canceled:
                    tcs.TrySetCanceled();
                    break;
                case TaskStatus.Faulted:
                    tcs.TrySetException(task.Exception.InnerExceptions);
                    break;
                case TaskStatus.RanToCompletion:
                    tcs.TrySetResult(task.Result);
                    break;
            }
        }
        , CancellationToken.None
        , TaskContinuationOptions.ExecuteSynchronously
        , TaskScheduler.Default);
    }

    return output.Select(tcs => tcs.Task);
}

所以在这里我们创建了一个TaskCompletionSource为每个输入任务,然后通过每个任务和设置从一抓起下一完成源延续BlockingCollection并设置它的结果。 完成第一个任务是抓取返回的第一个TCS,第二个任务完成获取返回,等第二TCS。

现在,你的代码变得很简单:

var tasks = collection.Select(item => LongRunningOperationThatReturnsTask(item))
    .Order();
foreach(var task in tasks)
{
    var result = task.Result;//or you could `await` each result
    //....
}


Answer 3:

在由MS机器人团队提出的异步库,他们不得不并发原语,其允许使用迭代以产生异步代码。

库(CCR)是免费的(它没有使用是免费的)。 一个很好的介绍性的文章可以在这里找到: 并发事务

也许你可以使用这个库旁边的.Net任务库,或者它会激发你“滚你自己”



文章来源: How to yield from parallel tasks in .NET 4.5