Correct way to link Tasks together when return val

2019-08-15 07:09发布

问题:

I hope this makes sense - Suppose I have the following code:

Task.Run(() =>
{
    return Task.WhenAll
        (
            Task1,
            Task2,
            ...
            Taskn
        )
        .ContinueWith(tsks=>
            {
                TaskA (uses output from Tasks Task1 & Task2, say)
            }
        , ct)
        .ContinueWith(res =>
            {
                TaskB (uses output from TaskA and Task3, say)
            }
        , ct);
});

So I want all my first N tasks to run concurrently (since we have no interdependencies), then only once they're all finished, to continue with a task that relies on their outputs (I get that for this, I can use the tsks.Result). BUT THEN I want to continue with a task that relies on one of the first tasks and the result of TaskA.

I'm a bit lost how to structure my code correctly so I can access the results of my first set of tasks outside of the immediately proceeding ContinueWith.

My one thought was to assign return value to them within my method - Something like:

... declare variables outside of Tasks ...

Task.Run(() =>
{
    return Task.WhenAll
        (
            Task.Run(() => { var1 = Task1.Result; }, ct),
            ...
            Task.Run(() => { varn = Taskn.Result; }, ct),
        )
        .ContinueWith(tsks=>
            {
                TaskA (uses output from Tasks var1 & varn, say)
            }
        , ct)
        .ContinueWith(res =>
            {
                TaskB (uses output from TaskA and var3, say)
            }
        , ct);
});

But, even though this works for me, I have no doubt that that is doing it wrong.

What is the correct way? Should I have a state object that contains all the necessary variables and pass that throughout all my tasks? Is there a better way in total?

Please forgive my ignorance here - I'm just VERY new to concurrency programming.

回答1:

Since Task1, Task2, ... , TaskN are in scope for the call of WhenAll, and because by the time ContinueWith passes control to your next task all the earlier tasks are guaranteed to finish, it is safe to use TaskX.Result inside the code implementing continuations:

.ContinueWith(tsks=>
        {
            var resTask1 = Task1.Result;
            ...
        }
    , ct)

You are guaranteed to get the result without blocking, because the task Task1 has finished running.



回答2:

Here is a way to do it with ConcurrentDictionary, which sounds like it might be applicable in your use case. Also, since you're new to concurrency, it shows you the Interlocked class as well:

class Program
{
    static void Main(string[] args)
    {
        Console.WriteLine("Executing...");

        var numOfTasks = 50;
        var tasks = new List<Task>();
        for (int i = 0; i < numOfTasks; i++)
        {
            var iTask = Task.Run(() =>
            {
                var counter = Interlocked.Increment(ref _Counter);

                Console.WriteLine(counter);

                if (counter == numOfTasks - 1)
                {
                    Console.WriteLine("Waiting {0} ms", 5000);
                    Task.Delay(5000).Wait(); // to simulate a longish running task
                }

                _State.AddOrUpdate(counter, "Updated Yo!", (k, v) =>
                {
                    throw new InvalidOperationException("This shouldn't occure more than once.");
                });
            });
            tasks.Add(iTask);
        }

        Task.WhenAll(tasks)
            .ContinueWith(t =>
            {
                var longishState = _State[numOfTasks - 1];
                Console.WriteLine(longishState);
                Console.WriteLine("Complete. longishState: " + longishState);
            });

        Console.ReadKey();
    }

    static int _Counter = -1;
    static ConcurrentDictionary<int, string> _State = new ConcurrentDictionary<int, string>();
}

You get output similar to this (though it the Waiting line won't always be last before the continuation):



回答3:

An elegant way to solve this is to use Barrier class.

Like this:

var nrOfTasks = ... ;
ConcurrentDictionary<int, ResultType> Results = new ConcurrentDictionary<int, ResultType>();

var barrier = new Barrier(nrOfTasks, (b) =>
{
    // here goes the work of TaskA
    // and immediatley
    // here goes the work of TaskB, having the results of TaskA and any other task you might need
});

Task.Run(() => { Results[1] = Task1.Result; barrier.SignalAndWait(); }, ct),
...
Task.Run(() => { Results[nrOfTasks] = Taskn.Result; barrier.SignalAndWait(); }, ct