Efficient signaling Tasks for TPL completions on f

2020-08-26 03:26发布

问题:

I'm working on a simulation system that, among other things, allows for the execution of tasks in discrete simulated time steps. Execution all occurs in the context of the simulation thread, but, from the perspective of an 'operator' using the system, they wish to behave asynchronously. Thankfully the TPL, with the handy 'async/await' keywords, makes this fairly straightforward. I have a primitive method on the Simulation like this:

    public Task CycleExecutedEvent()
    {
        lock (_cycleExecutedBroker)
        {
            if (!IsRunning) throw new TaskCanceledException("Simulation has been stopped");
            return _cycleExecutedBroker.RegisterForCompletion(CycleExecutedEventName);
        }
    }

This is basically creating a new TaskCompletionSource and then returning a Task. The purpose of this Task is to execute its continuation when the new 'ExecuteCycle' on the simulation occurs.

I then have some extension methods like this:

    public static async Task WaitForDuration(this ISimulation simulation, double duration)
    {
        double startTime = simulation.CurrentSimulatedTime;
        do
        {
            await simulation.CycleExecutedEvent();
        } while ((simulation.CurrentSimulatedTime - startTime) < duration);
    }

    public static async Task WaitForCondition(this ISimulation simulation, Func<bool> condition)
    {
        do
        {
            await simulation.CycleExecutedEvent();
        } while (!condition());
    }

These are very handy, then, for building sequences from an 'operator' perspective, taking actions based on conditions and waiting for periods of simulated time. The issue I'm running into is that CycleExecuted occurs very frequently (roughly every few milliseconds if I'm running at fully accelerated speed). Because these 'wait' helper methods register a new 'await' on each cycle, this causes a large turnover in TaskCompletionSource instances.

I've profiled my code and I've found that roughly 5.5% of my total CPU time is spent within these completions, of which only a negligible percentage is spent in the 'active' code. Effectively all of the time is spent registering new completions while waiting for the triggering conditions to be valid.

My question: how can I improve performance here while still retaining the convenience of the async/await pattern for writing 'operator behaviors'? I'm thinking I need something like a lighter-weight and/or reusable TaskCompletionSource, given that the triggering event occurs so frequently.


I've been doing a bit more research and it sounds like a good option would be to create a custom implementation of the Awaitable pattern, which could tie directly into the event, eliminating the need for a bunch of TaskCompletionSource and Task instances. The reason it could be useful here is that there are a lot of different continuations awaiting the CycleExecutedEvent and they need to await it frequently. So ideally I'm looking at a way to just queue up continuation callbacks, then call back everything in the queue whenever the event occurs. I'll keep digging, but I welcome any help if folks know a clean way to do this.


For anybody browsing this question in the future, here is the custom awaiter I put together:

public sealed class CycleExecutedAwaiter : INotifyCompletion
{
    private readonly List<Action> _continuations = new List<Action>();

    public bool IsCompleted
    {
        get { return false; }
    }

    public void GetResult()
    {
    }

    public void OnCompleted(Action continuation)
    {
        _continuations.Add(continuation);
    }

    public void RunContinuations()
    {
        var continuations = _continuations.ToArray();
        _continuations.Clear();
        foreach (var continuation in continuations)
            continuation();
    }

    public CycleExecutedAwaiter GetAwaiter()
    {
        return this;
    }
}

And in the Simulator:

    private readonly CycleExecutedAwaiter _cycleExecutedAwaiter = new CycleExecutedAwaiter();

    public CycleExecutedAwaiter CycleExecutedEvent()
    {
        if (!IsRunning) throw new TaskCanceledException("Simulation has been stopped");
        return _cycleExecutedAwaiter;
    }

It's a bit funny, as the awaiter never reports Complete, but fires continues to call completions as they are registered; still, it works well for this application. This reduces the CPU overhead from 5.5% to 2.1%. It will likely still require some tweaking, but it's a nice improvement over the original.

回答1:

The await keyword doesn't work just on Tasks, it works on anything that follows the awaitable pattern. For details, see Stephen Toub's article await anything;.

The short version is that the type has to have a method GetAwaiter() that returns a type that implements INotifyCompletion and also has IsCompleted property and GetResult() method (void-returning, if the await expression shouldn't have a value). For an example, see TaskAwaiter.

If you create your own awaitable, you could return the same object every time, avoiding the overhead of allocating many TaskCompletionSources.



回答2:

Here is my version of ReusableAwaiter simulating TaskCompletionSource

public sealed class ReusableAwaiter<T> : INotifyCompletion
{
    private Action _continuation = null;
    private T _result = default(T);
    private Exception _exception = null;

    public bool IsCompleted
    {
        get;
        private set;
    }

    public T GetResult()
    {
        if (_exception != null)
            throw _exception;
        return _result;
    }

    public void OnCompleted(Action continuation)
    {
        if (_continuation != null)
            throw new InvalidOperationException("This ReusableAwaiter instance has already been listened");
        _continuation = continuation;
    }

    /// <summary>
    /// Attempts to transition the completion state.
    /// </summary>
    /// <param name="result"></param>
    /// <returns></returns>
    public bool TrySetResult(T result)
    {
        if (!this.IsCompleted)
        {
            this.IsCompleted = true;
            this._result = result;

            if (_continuation != null)
                _continuation();
            return true;
        }
        return false;
    }

    /// <summary>
    /// Attempts to transition the exception state.
    /// </summary>
    /// <param name="result"></param>
    /// <returns></returns>
    public bool TrySetException(Exception exception)
    {
        if (!this.IsCompleted)
        {
            this.IsCompleted = true;
            this._exception = exception;

            if (_continuation != null)
                _continuation();
            return true;
        }
        return false;
    }

    /// <summary>
    /// Reset the awaiter to initial status
    /// </summary>
    /// <returns></returns>
    public ReusableAwaiter<T> Reset()
    {
        this._result = default(T);
        this._continuation = null;
        this._exception = null;
        this.IsCompleted = false;
        return this;
    }

    public ReusableAwaiter<T> GetAwaiter()
    {
        return this;
    }
}

And here is the test code.

class Program
{
    static readonly ReusableAwaiter<int> _awaiter = new ReusableAwaiter<int>();

    static void Main(string[] args)
    {
        Task.Run(() => Test());

        Console.ReadLine();
        _awaiter.TrySetResult(22);
        Console.ReadLine();
        _awaiter.TrySetException(new Exception("ERR"));

        Console.ReadLine();
    }

    static async void Test()
    {

        int a = await AsyncMethod();
        Console.WriteLine(a);
        try
        {
            await AsyncMethod();
        }
        catch(Exception ex)
        {
            Console.WriteLine(ex.Message);
        }

    }

    static  ReusableAwaiter<int> AsyncMethod()
    {
        return _awaiter.Reset();
    }

}


回答3:

Do you really need to receive the WaitForDuration-event on a different thread? If not, you could just register a callback (or an event) with _cycleExecutedBroker and receive notification synchronously. In the callback you can test any condition you like and only if that condition turns out to be true, notify a different thread (using a task or message or whatever mechanism). I understand the condition you test for rarely evaluates to true, so you avoid most cross-thread calls that way.

I guess the gist of my answer is: Try to reduce the amount of cross-thread messaging by moving computation to the "source" thread.