I would like to use .NET iterator with parallel Tasks/await?. Something like this:
IEnumerable<TDst> Foo<TSrc, TDest>(IEnumerable<TSrc> source)
{
Parallel.ForEach(
source,
s=>
{
// Ordering is NOT important
// items can be yielded as soon as they are done
yield return ExecuteOrDownloadSomething(s);
}
}
Unfortunately .NET cannot natively handle this. Best answer so far by @svick - use AsParallel().
BONUS: Any simple async/await code that implements multiple publishers and a single subscriber? The subscriber would yield, and the pubs would process. (core libraries only)
This seems like a job for PLINQ:
return source.AsParallel().Select(s => ExecuteOrDownloadSomething(s));
This will execute the delegate in parallel using a limited number of threads, returning each result as soon as it completes.
If the ExecuteOrDownloadSomething()
method is IO-bound (e.g. it actually downloads something) and you don't want to waste threads, then using async
-await
might make sense, but it would be more complicated.
If you want to fully take advantage of async
, you shouldn't return IEnumerable
, because it's synchronous (i.e. it blocks if no items are available). What you need is some sort of asynchronous collection, and you can use ISourceBlock
(specifically, TransformBlock
) from TPL Dataflow for that:
ISourceBlock<TDst> Foo<TSrc, TDest>(IEnumerable<TSrc> source)
{
var block = new TransformBlock<TSrc, TDest>(
async s => await ExecuteOrDownloadSomethingAsync(s),
new ExecutionDataflowBlockOptions
{
MaxDegreeOfParallelism = DataflowBlockOptions.Unbounded
});
foreach (var item in source)
block.Post(item);
block.Complete();
return block;
}
If the source is “slow” (i.e. you want to start processing the results from Foo()
before iterating source
is completed), you might want to move the foreach
and Complete()
call to a separate Task
. Even better solution would be to make source
into a ISourceBlock<TSrc>
too.
So it appears what you really want to do is to order a sequence of tasks based on when they complete. This is not terribly complex:
public static IEnumerable<Task<T>> Order<T>(this IEnumerable<Task<T>> tasks)
{
var input = tasks.ToList();
var output = input.Select(task => new TaskCompletionSource<T>());
var collection = new BlockingCollection<TaskCompletionSource<T>>();
foreach (var tcs in output)
collection.Add(tcs);
foreach (var task in input)
{
task.ContinueWith(t =>
{
var tcs = collection.Take();
switch (task.Status)
{
case TaskStatus.Canceled:
tcs.TrySetCanceled();
break;
case TaskStatus.Faulted:
tcs.TrySetException(task.Exception.InnerExceptions);
break;
case TaskStatus.RanToCompletion:
tcs.TrySetResult(task.Result);
break;
}
}
, CancellationToken.None
, TaskContinuationOptions.ExecuteSynchronously
, TaskScheduler.Default);
}
return output.Select(tcs => tcs.Task);
}
So here we create a TaskCompletionSource
for each input task, then go through each of the tasks and set a continuation which grabs the next completion source from a BlockingCollection
and sets it's result. The first task completed grabs the first tcs that was returned, the second task completed gets the second tcs that was returned, and so on.
Now your code becomes quite simple:
var tasks = collection.Select(item => LongRunningOperationThatReturnsTask(item))
.Order();
foreach(var task in tasks)
{
var result = task.Result;//or you could `await` each result
//....
}
In the asynchronous library made by the MS robotics team, they had concurrency primitives which allowed for using an iterator to yield asynchronous code.
The library (CCR) is free (It didn't use to be free). A nice introductory article can be found here: Concurrent affairs
Perhaps you can use this library alongside .Net task library, or it'll inspire you to 'roll your own'