TaskContinuationOptions.RunContinuationsAsynchrono

2019-03-12 20:22发布

问题:

In this blog post, Stephan Toub describes a new feature that will be included in .NET 4.6 which adds another value to the TaskCreationOptions and TaskContinuationOptions enums called RunContinuationsAsynchronously.

He explains:

"I talked about a ramification of calling {Try}Set* methods on TaskCompletionSource, that any synchronous continuations off of the TaskCompletionSource’s Task could run synchronously as part of the call. If we were to invoke SetResult here while holding the lock, then synchronous continuations off of that Task would be run while holding the lock, and that could lead to very real problems. So, while holding the lock we grab the TaskCompletionSource to be completed, but we don’t complete it yet, delaying doing so until the lock has been released"

And gives the following example to demonstrate:

private SemaphoreSlim _gate = new SemaphoreSlim(1, 1);
private async Task WorkAsync()
{
    await _gate.WaitAsync().ConfigureAwait(false);
    try
    {
        // work here
    }
    finally { _gate.Release(); }
}

Now imagine that you have lots of calls to WorkAsync:

await Task.WhenAll(from i in Enumerable.Range(0, 10000) select WorkAsync());

We've just created 10,000 calls to WorkAsync that will be appropriately serialized on the semaphore. One of the tasks will enter the critical region, and the others will queue up on the WaitAsync call, inside SemaphoreSlim effectively enqueueing the task to be completed when someone calls Release. If Release completed that Task synchronously, then when the first task calls Release, it'll synchronously start executing the second task, and when it calls Release, it'll synchronously start executing the third task, and so on. If the "//work here" section of code above didn't include any awaits that yielded, then we're potentially going to stack dive here and eventually potentially blow out the stack.

I'm having a hard time grasping the part where he talks about executing the continuation synchronously.

Question

How could this possibly cause a stack dive? More so, And what is RunContinuationsAsynchronously effectively going to do in order to solve that problem?

回答1:

The key concept here is that a task's continuation may run synchronously on the same thread that completed the antecedent task.

Let's imagine that this is SemaphoreSlim.Release's implementation (it's actually Toub's AsyncSemphore's):

public void Release() 
{ 
    TaskCompletionSource<bool> toRelease = null; 
    lock (m_waiters) 
    { 
        if (m_waiters.Count > 0) 
            toRelease = m_waiters.Dequeue(); 
        else 
            ++m_currentCount; 
    } 
    if (toRelease != null) 
        toRelease.SetResult(true); 
}

We can see that it synchronously completes a task (using TaskCompletionSource). In this case, if WorkAsync has no other asynchronous points (i.e. no awaits at all, or all awaits are on an already completed task) and calling _gate.Release() may complete a pending call to _gate.WaitAsync() synchronously on the same thread you may reach a state in which a single thread sequentially releases the semaphore, completes the next pending call, executes // work here and then releases the semaphore again etc. etc.

This means that the same thread goes deeper and deeper in the stack, hence stack dive.

RunContinuationsAsynchronously makes sure the continuation doesn't run synchronously and so the thread that releases the semaphore moves on and the continuation is scheduled for another thread (which one depends on the other continuation parameters e.g. TaskScheduler)

This logically resembles posting the completion to the ThreadPool:

public void Release() 
{ 
    TaskCompletionSource<bool> toRelease = null; 
    lock (m_waiters) 
    { 
        if (m_waiters.Count > 0) 
            toRelease = m_waiters.Dequeue(); 
        else 
            ++m_currentCount; 
    } 
    if (toRelease != null) 
        Task.Run(() => toRelease.SetResult(true)); 
}


回答2:

How could this possibly cause a stack dive? More so, And what is RunContinuationsAsynchronously effectively going to do in order to solve that problem?

i3arnon provides a very good explanation of the reasons behind introducing RunContinuationsAsynchronously. My answer is rather orthogonal to his; in fact, I'm writing this for my own reference as well (I myself ain't gonna remember any subtleties of this in half a year from now :)

First of all, let's see how TaskCompletionSource's RunContinuationsAsynchronously option is different from Task.Run(() => tcs.SetResult(result)) or the likes. Let's try a simple console application:

using System;
using System.Threading;
using System.Threading.Tasks;

namespace ConsoleApplications
{
    class Program
    {
        static void Main(string[] args)
        {
            ThreadPool.SetMinThreads(100, 100);

            Console.WriteLine("start, " + new { System.Environment.CurrentManagedThreadId });

            var tcs = new TaskCompletionSource<bool>();

            // test ContinueWith-style continuations (TaskContinuationOptions.ExecuteSynchronously)
            ContinueWith(1, tcs.Task);
            ContinueWith(2, tcs.Task);
            ContinueWith(3, tcs.Task);

            // test await-style continuations
            ContinueAsync(4, tcs.Task);
            ContinueAsync(5, tcs.Task);
            ContinueAsync(6, tcs.Task);

            Task.Run(() =>
            {
                Console.WriteLine("before SetResult, " + new { System.Environment.CurrentManagedThreadId });
                tcs.TrySetResult(true);
                Thread.Sleep(10000);
            });
            Console.ReadLine();
        }

        // log
        static void Continuation(int id)
        {
            Console.WriteLine(new { continuation = id, System.Environment.CurrentManagedThreadId });
            Thread.Sleep(1000);
        }

        // await-style continuation
        static async Task ContinueAsync(int id, Task task)
        {
            await task.ConfigureAwait(false);
            Continuation(id);
        }

        // ContinueWith-style continuation
        static Task ContinueWith(int id, Task task)
        {
            return task.ContinueWith(
                t => Continuation(id),
                CancellationToken.None, TaskContinuationOptions.ExecuteSynchronously, TaskScheduler.Default);
        }
    }
}

Note how all continuations run synchronously on the same thread where TrySetResult has been called:

start, { CurrentManagedThreadId = 1 }
before SetResult, { CurrentManagedThreadId = 3 }
{ continuation = 1, CurrentManagedThreadId = 3 }
{ continuation = 2, CurrentManagedThreadId = 3 }
{ continuation = 3, CurrentManagedThreadId = 3 }
{ continuation = 4, CurrentManagedThreadId = 3 }
{ continuation = 5, CurrentManagedThreadId = 3 }
{ continuation = 6, CurrentManagedThreadId = 3 }

Now what if we don't want this to happen, and we want each continuation to run asynchronously (i.e., in parallel with other continuations and possibly on another thread, in the absence of any synchronization context)?

There's a trick that could do it for await-style continuations, by installing a fake temporary synchronization context (more details here):

public static class TaskExt
{
    class SimpleSynchronizationContext : SynchronizationContext
    {
        internal static readonly SimpleSynchronizationContext Instance = new SimpleSynchronizationContext();
    };

    public static void TrySetResult<TResult>(this TaskCompletionSource<TResult> @this, TResult result, bool asyncAwaitContinuations)
    {
        if (!asyncAwaitContinuations)
        {
            @this.TrySetResult(result);
            return;
        }

        var sc = SynchronizationContext.Current;
        SynchronizationContext.SetSynchronizationContext(SimpleSynchronizationContext.Instance);
        try
        {
            @this.TrySetResult(result);
        }
        finally
        {
            SynchronizationContext.SetSynchronizationContext(sc);
        }
    }
}

Now, using tcs.TrySetResult(true, asyncAwaitContinuations: true) in our test code:

start, { CurrentManagedThreadId = 1 }
before SetResult, { CurrentManagedThreadId = 3 }
{ continuation = 1, CurrentManagedThreadId = 3 }
{ continuation = 2, CurrentManagedThreadId = 3 }
{ continuation = 3, CurrentManagedThreadId = 3 }
{ continuation = 4, CurrentManagedThreadId = 4 }
{ continuation = 5, CurrentManagedThreadId = 5 }
{ continuation = 6, CurrentManagedThreadId = 6 }

Note how await continuations now run in parallel (albeit, still after all synchronous ContinueWith continuations).

This asyncAwaitContinuations: true logic is a hack and it works for await continuations only. The new RunContinuationsAsynchronously makes it work consistently for any kind of continuations, attached to TaskCompletionSource.Task.

Another nice aspect of RunContinuationsAsynchronously is that any await-style continuations scheduled to be resumed on specific synchronization context will run on that context asynchronously (using SynchronizationContext.Post, even if TCS.Task completes on the same context (unlike the current behavior of TCS.SetResult). ContinueWith-style continuations will be also be run asynchronously by their corresponding task schedulers (most often, TaskScheduler.Default or TaskScheduler.FromCurrentSynchronizationContext). They won't be inlined via TaskScheduler.TryExecuteTaskInline. I believe Stephen Toub has clarified that in the comments to his blog post, and it also can be seen here in CoreCLR's Task.cs.

Why should we be worrying about imposing asynchrony on all continuations?

I usually need it when I deal with async methods which execute cooperatively (co-routines).

A simple example is a pause-able asynchronous processing: one async process pauses/resumes the execution of another. Their execution workflow synchronizes at certain await points, and TaskCompletionSource is used for such kind of synchronization, directly or indirectly.

Below is some ready-to-play-with sample code which uses an adaptation of Stephen Toub's PauseTokenSource. Here, one async method StartAndControlWorkAsync starts and periodically pauses/resumes another async method, DoWorkAsync. Try changing asyncAwaitContinuations: true to asyncAwaitContinuations: false and see the logic being completely broken:

using System;
using System.Threading;
using System.Threading.Tasks;

namespace ConsoleApp
{
    class Program
    {
        static void Main()
        {
            StartAndControlWorkAsync(CancellationToken.None).Wait();
        }

        // Do some work which can be paused/resumed
        public static async Task DoWorkAsync(PauseToken pause, CancellationToken token)
        {
            try
            {
                var step = 0;
                while (true)
                {
                    token.ThrowIfCancellationRequested();
                    Console.WriteLine("Working, step: " + step++);
                    await Task.Delay(1000).ConfigureAwait(false);
                    Console.WriteLine("Before await pause.WaitForResumeAsync()");
                    await pause.WaitForResumeAsync();
                    Console.WriteLine("After await pause.WaitForResumeAsync()");
                }
            }
            catch (Exception e)
            {
                Console.WriteLine("Exception: {0}", e);
                throw;
            }
        }

        // Start DoWorkAsync and pause/resume it
        static async Task StartAndControlWorkAsync(CancellationToken token)
        {
            var pts = new PauseTokenSource();
            var task = DoWorkAsync(pts.Token, token);

            while (true)
            {
                token.ThrowIfCancellationRequested();

                Console.WriteLine("Press enter to pause...");
                Console.ReadLine();

                Console.WriteLine("Before pause requested");
                await pts.PauseAsync();
                Console.WriteLine("After pause requested, paused: " + pts.IsPaused);

                Console.WriteLine("Press enter to resume...");
                Console.ReadLine();

                Console.WriteLine("Before resume");
                pts.Resume();
                Console.WriteLine("After resume");
            }
        }

        // Based on Stephen Toub's PauseTokenSource
        // http://blogs.msdn.com/b/pfxteam/archive/2013/01/13/cooperatively-pausing-async-methods.aspx
        // the main difference is to make sure that when the consumer-side code - which requested the pause - continues, 
        // the producer-side code has already reached the paused (awaiting) state.
        // E.g. a media player "Pause" button is clicked, gets disabled, playback stops, 
        // and only then "Resume" button gets enabled

        public class PauseTokenSource
        {
            internal static readonly Task s_completedTask = Task.Delay(0);

            readonly object _lock = new Object();

            bool _paused = false;

            TaskCompletionSource<bool> _pauseResponseTcs;
            TaskCompletionSource<bool> _resumeRequestTcs;

            public PauseToken Token { get { return new PauseToken(this); } }

            public bool IsPaused
            {
                get
                {
                    lock (_lock)
                        return _paused;
                }
            }

            // request a resume
            public void Resume()
            {
                TaskCompletionSource<bool> resumeRequestTcs = null;

                lock (_lock)
                {
                    resumeRequestTcs = _resumeRequestTcs;
                    _resumeRequestTcs = null;

                    if (!_paused)
                        return;
                    _paused = false;
                }

                if (resumeRequestTcs != null)
                    resumeRequestTcs.TrySetResult(true, asyncAwaitContinuations: true);
            }

            // request a pause (completes when paused state confirmed)
            public Task PauseAsync()
            {
                Task responseTask = null;

                lock (_lock)
                {
                    if (_paused)
                        return _pauseResponseTcs.Task;
                    _paused = true;

                    _pauseResponseTcs = new TaskCompletionSource<bool>();
                    responseTask = _pauseResponseTcs.Task;

                    _resumeRequestTcs = null;
                }

                return responseTask;
            }

            // wait for resume request
            internal Task WaitForResumeAsync()
            {
                Task resumeTask = s_completedTask;
                TaskCompletionSource<bool> pauseResponseTcs = null;

                lock (_lock)
                {
                    if (!_paused)
                        return s_completedTask;

                    _resumeRequestTcs = new TaskCompletionSource<bool>();
                    resumeTask = _resumeRequestTcs.Task;

                    pauseResponseTcs = _pauseResponseTcs;

                    _pauseResponseTcs = null;
                }

                if (pauseResponseTcs != null)
                    pauseResponseTcs.TrySetResult(true, asyncAwaitContinuations: true);

                return resumeTask;
            }
        }

        // consumer side
        public struct PauseToken
        {
            readonly PauseTokenSource _source;

            public PauseToken(PauseTokenSource source) { _source = source; }

            public bool IsPaused { get { return _source != null && _source.IsPaused; } }

            public Task WaitForResumeAsync()
            {
                return IsPaused ?
                    _source.WaitForResumeAsync() :
                    PauseTokenSource.s_completedTask;
            }
        }


    }

    public static class TaskExt
    {
        class SimpleSynchronizationContext : SynchronizationContext
        {
            internal static readonly SimpleSynchronizationContext Instance = new SimpleSynchronizationContext();
        };

        public static void TrySetResult<TResult>(this TaskCompletionSource<TResult> @this, TResult result, bool asyncAwaitContinuations)
        {
            if (!asyncAwaitContinuations)
            {
                @this.TrySetResult(result);
                return;
            }

            var sc = SynchronizationContext.Current;
            SynchronizationContext.SetSynchronizationContext(SimpleSynchronizationContext.Instance);
            try
            {
                @this.TrySetResult(result);
            }
            finally
            {
                SynchronizationContext.SetSynchronizationContext(sc);
            }
        }
    }
}

I didn't want to use Task.Run(() => tcs.SetResult(result)) here, because it would be redundant to push continuations to ThreadPool when they're already scheduled to run asynchronously on a UI thread with a proper synchronization context. At the same time, if both StartAndControlWorkAsync and DoWorkAsync run on the same UI synchronization context, we'd also have a stack dive (if tcs.SetResult(result) is used without Task.Run or SynchronizationContext.Post wrapping).

Now, RunContinuationsAsynchronously is probably the best solution to this problem.