If possible I want to create an async-enumerator for tasks launched in parallel. So first to complete is first element of the enumeration, second to finish is second element of the enumeration, etc.
public static async IAsyncEnumerable<T> ParallelEnumerateAsync(this IEnumerable<Task<T>> coldAsyncTasks)
{
// ...
}
I bet there is a way using ContinueWith
and a Queue<T>
, but I don't completely trust myself to implement it.
Here is a version that also allows to specify the maximum degree of parallelism. The idea is that the tasks are enumerated with a lag. For example for
degreeOfParallelism: 4
the first 4 tasks are enumerated immediately, causing them to be created, and then the first one of these is awaited. Next the 5th task is enumerated and the 2nd is awaited, and so on.To keep things tidy, the
Lag
method is embedded inside theParallelEnumerateAsync
method as a static local function (new feature of C# 8).Since the enumeration of the
coldTasks
enumerable will be most probably driven from multiple threads, it is enumerated using a thread-safe wrapper.Is this what you're looking for?
If I understand your question right, your focus is to launch all tasks, let them all run in parallel, but make sure the return values are processed in the same order as the tasks were launched.
Checking out the specs, with C# 8.0 Asynchronous Streams task queuing for parallel execution but sequential return can look like this.
Possible output:
On a practical note, there doesn't seem to be any new language-level support for this pattern, and besides since the asynchronous streams deal with
IAsyncEnumerable<T>
, it means that a baseTask
would not work here and all the workerasync
methods should have the sameTask<T>
return type, which somewhat limits asynchronous streams-based design.Because of this and depending on your situation (Do you want to be able to cancel long-running tasks? Is per-task exception handling required? Should there be a limit to the number of concurrent tasks?) it might make sense to check out @TheGeneral 's suggestions up there.
Update:
Note that
RunAndPreserveOrderAsync<T>
does not necessarily have to use aQueue
of tasks - this was only chosen to better show coding intentions.Converting an enumerator to
List
would produce the same result; the body ofRunAndPreserveOrderAsync<T>
can be replaced with one line hereIn this implementation it is important that all the tasks are generated and launched first, which is done along with
Queue
initialization or a conversion oftasks
enumerable toList
. However, it might be hard to resist simplifying the aboveforeach
line like thiswhich would cause the tasks being executed sequentially and not running in parallel.