Here is the code involved:
private static async Task DoRunInOrderAsync<TTaskSeed>(SemaphoreSlim sem, IObservable<TTaskSeed> taskSeedSource, CreateTaskDelegate<TTaskSeed> createTask, OnTaskErrorDelegate<TTaskSeed> onFailed, OnTaskSuccessDelegate<TTaskSeed> onSuccess) where TTaskSeed : class
{
var tasks = await taskSeedSource
.Select(taskSeed => GetPendingOrRunningTask(taskSeed, createTask, onFailed, onSuccess, sem))
.ToList()
.ToTask();
await Task.WhenAll(tasks);
}
private static async Task GetPendingOrRunningTask<T>(T taskSeed, CreateTaskDelegate<T> createTask, OnTaskErrorDelegate<T> onFailed, OnTaskSuccessDelegate<T> onSuccess,
SemaphoreSlim sem) where T : class
{
Exception exc = null;
await sem.WaitAsync();
try
{
var task = createTask(taskSeed);
if (task != null)
{
await task;
}
onSuccess(task, taskSeed);
}
catch (Exception e)
{
exc = e;
}
sem.Release();
if (exc != null)
{
onFailed(exc, taskSeed);
}
}
Where:
Select
isIObservable<TResult> Select<TSource, TResult>(this IObservable<TSource> source, Func<TSource, TResult> selector)
fromSystem.Reactive.Linq.Observable
ToList
isIObservable<IList<TSource>> ToList<TSource>(this IObservable<TSource> source)
fromSystem.Reactive.Linq.Observable
ToTask
isTask<TResult> ToTask<TResult>(this IObservable<TResult> observable)
fromSystem.Reactive.Threading.Tasks.TaskObservableExtensions
- System.Reactive.Linq version is 2.2.5.0
As far as I can see, everything is built, no stale binaries around. The error occurs often, but not always.
For the life of me, I cannot understand how the tasks
list can contain null
if the GetPendingOrRunningTask
method is async Task
?
EDIT
So ToList
injects null
. How? Why? What am I doing wrong (besides programming for a living) ?
You may have a race condition.
.ToList()
calls theList<T>
constructor. The code is here.If the number of elements in your
taskSeedSource
changes between the time the constructor starts and finishes, it's possible that you could end up with an inconsistent list. In particular look at line 94.