Run sequence of tasks, one after the other

2020-02-28 10:16发布

I have a sequence of tasks, where each one depends on the output of the previous one. I'd like to represent this as a single Task object, whose result is the output of the end of the sequence. (If the tasks didn't depend on one another then I could do it in parallel and I would use TaskFactory.ContinueWhenAll.)

I'd like to be able to implement this method:

static Task<TState> AggregateAsync<T, TState>(
    IEnumerable<T> items,
    TState initial,
    Func<TState, T, Task<TState>> makeTask);

How can I efficiently run the tasks one after another in sequence? I'm using C# 4.0 so I can't use async/await to do it for me.

Edit: I could write AggregateAsync like this:

static Task<TState> AggregateAsync<T, TState>(IEnumerable<T> items, TState initial, Func<TState, T, Task<TState>> makeTask)
{
    var initialTask = Task.Factory.StartNew(() => initial);
    return items.Aggregate(
        initialTask,
        (prevTask, item) =>
            {
                prevTask.Wait(); // synchronous blocking here?
                return makeTask(prevTask.Result, item);
            });
}

But surely I'll get a batch of tasks, each of which blocks synchronously waiting for the one before it?

2条回答
一夜七次
2楼-- · 2020-02-28 11:11

The easy way (using Microsoft.Bcl.Async):

static async Task<TState> AggregateAsync<T, TState>(
    this IEnumerable<T> items,
    TState initial,
    Func<TState, T, Task<TState>> makeTask)
{
  var state = initial;
  foreach (var item in items)
    state = await makeTask(state, item);
  return state;
}

The hard way:

static Task<TState> AggregateAsync<T, TState>(
    this IEnumerable<T> items,
    TState initial,
    Func<TState, T, Task<TState>> makeTask)
{
  var tcs = new TaskCompletionSource<TState>();
  tcs.SetResult(initial);
  Task<TState> ret = tcs.Task;
  foreach (var item in items)
  {
    var localItem = item;
    ret = ret.ContinueWith(t => makeTask(t.Result, localItem)).Unwrap();
  }
  return ret;
}

Note that error handling is more awkward with the "hard" way; an exception from the first item will be wrapped in an AggregateException by each successive item. The "easy" way does not wrap exceptions like this.

查看更多
兄弟一词,经得起流年.
3楼-- · 2020-02-28 11:12

You can use Task.ContinueWith. The task you see in the code below, represents the previous (completed) task, and you can fetch its result to execute the second task, and so on.

T item1 = default(T);
T item2 = default(T);
Task<TState> task1 = makeTask(initial, item1);

//create second task
task1.ContinueWith(task => makeTask(task.Result, item2).Result,
                     TaskContinuationOptions.OnlyOnRanToCompletion);

Edit

Sorry, I missed this part

I'd like to represent this as a single Task object, whose result is the output of the end of the sequence.

In order to do that, you just have to return a reference to the result of the last ContinueWith call.

Task<State> aggregate = task1.ContinueWith(
                                  task => makeTask(task.Result, item2).Result,
                                  TaskContinuationOptions.OnlyOnRanToCompletion);

var res = aggregate .Result; //wait synchronously for the result of the sequence
查看更多
登录 后发表回答