Using ContinueWith with Multiple Tasks

2019-08-24 11:03发布

问题:

This is not behaving quite as I thought it would the need is simple, launch a number of tasks to do operations on an object. One unique object per task. The second part is a ContinueWith when each task reports the results. However, I am not getting a WhenAll type behavior. Hopefully someone can set me straight.

_tasks = new Task<AnalysisResultArgs>[_beansList.Count];
for (int loopCnt = 0; loopCnt < _beansList.Count; loopCnt++)
{
    _tasks[loopCnt] = Task<AnalysisResultArgs>.Factory.StartNew(() =>
    {
        return _beansList[loopCnt].Analyze(newBeanData);
    });
    await _tasks[loopCnt].ContinueWith(ReportResults, 
                  TaskContinuationOptions.RunContinuationsAsynchronously)  
    // do some housekeeping when all tasks are complete          
}

private void ReportResults(Task<AnalysisResultArgs> task)
{
     /* Do some serial operations
}

It was my understanding that _beansList.Count tasks would be launched and by using await on the ContinueWith the housekeeping work won't execute until all the Tasks have completed. I cannot block, as I need to be sure to be able to throttle the incoming data to prevent way too many tasks waiting to be executed.

Where did I goof, the await actually completes and the housekeeping gets run even though not ALL of the tasks have run to completion.

回答1:

You do not await all the tasks, you're awaiting the continuation in loops. You should use the Task.WhenAll method for this. Also, why do you need the continuation, if you can run it inside task? Simplify your code like this:

private void ReportResults(AnalysisResultArgs results)
{
     /* Do some serial operations */
}

...
_tasks = new Task<AnalysisResultArgs>[_beansList.Count];
for (int loopCnt = 0; loopCnt < _beansList.Count; loopCnt++)
{
    var count = loopCnt;
    _tasks[count] = Task.Run(() =>
    {
        var results = _beansList[count].Analyze(newBeanData);
        ReportResults(results);
        return results;
    });
}

// do some housekeeping when all tasks are complete          
await Task.WhenAll(_tasks);


回答2:

As @Stephen has already mentioned, the code in the question is not minimum, complete and verifiable. I have taken some liberty to make some assumptions and that's how in my view your code shall look like:

public async Task<AnalysisResultArgs[]> MainMethod()
{
    var _beansList = new List<AnalysisResultArgs>();

    for(int i=0; i< 99; i++) // Considering 100 records
        _beansList.Add(new AnalysisResultArgs());

    var _tasks = new Task<AnalysisResultArgs>[_beansList.Count];

    for (int loopCnt = 0; loopCnt < _beansList.Count; loopCnt++)
    {
        var local = loopCnt;
        _tasks[local] = Task.Run(async() => await ReportResults(_beansList[local].Analyze(new AnalysisResultArgs())));
    }

    return await Task.WhenAll(_tasks);
}

private async Task<AnalysisResultArgs> ReportResults(Task<AnalysisResultArgs> task)
{
    await Task.Delay(1000);
    return await Task.FromResult(new AnalysisResultArgs());
}

public class AnalysisResultArgs
{   
    public async Task<AnalysisResultArgs> Analyze(AnalysisResultArgs newBeanData)
    {
        await Task.Delay(1000);
        return await Task.FromResult(new AnalysisResultArgs());
    }
}

Assumptions / Other details:

  1. Task.Delay and Task.FromResult are the mere place holders of actual logic.
  2. All methods in the call Analyze and ReportResults are Async
  3. In the loop code could have been:

    _tasks[local] = ReportResults(_beansList[local].Analyze(new 
             AnalysisResultArgs()));
    

but if we are using Task.Run to start a Task, then async, await helps in freeing up the calling thread pool thread / synchronization context. It helps in improving the system scalability.

  1. Assumed _beansList is of type List<AnalysisResultArgs>, but that can be modified based on actual requirement
  2. ReportResults could be modified to take a Func<Task<AnalysisResultArgs>> func, instead of simple dTask<AnalysisResultArgs>, thus doing await func() inside the method and then the calling code could have been truly async as follows:

      _tasks[local] = Task.Run(async() => await ReportResults(async() => await 
      _beansList[local].Analyze(new AnalysisResultArgs())));