Is it acceptable to use TaskCompletionSource as a

2019-05-01 04:24发布

My code handles a TCP connection to a remote host, with a ConcurrentQueue to store outgoing messages. It's intended to run in a single thread. The lifetime of the connection is contained within RunAsync while a separate object contains the "public state" of the connection:

class PublicState
{
    internal readonly ConcurrentQueue<Message> OutgoingMessageQueue = new ConcurrentQueue<Message>();
    internal TaskCompletionSource<Object> OutgoingMessageTcs = null;

    internal readonly TaskCompletionSource<Object> ConnectedTcs = new TaskCompletionSource<Object>();

    public void EnqueueMessages(IEnumerable<Message> messages)
    {
        foreach( Message m in messages ) this.OutgoingMessageQueue.Enqueue( m);
        if( this.OutgoingMessageTcs == null ) this.OutgoingMessageTcs = new TaskCompletionSource<Object>();
        this.OutgoingMessageTcs.SetResult( null );
    }
}

static async Task RunAsync(IPEndPoint endPoint, PublicState state)
{
    using( TcpClient tcp = new TcpClient() )
    {
        await tcp.ConnectAsync( endPoint.Address, endPoint.Port ).ConfigureAwait(false);

        Byte[] reusableBuffer = new Byte[ 4096 ];

        using( NetworkStream ns = tcp.GetStream() )
        {
            state.ConnectedTcs.SetResult( null );

            Task<Int32> nsReadTask = null;

            while( tcp.Connected )
            {
                if( !state.writeQueue.IsEmpty )
                {
                    await WriteMessagesAsync( ... ).ConfigureAwait( false );
                }

                if( ns.DataAvailable )
                {
                    await ReadMessagesAsync( ... ).ConfigureAwait( false );
                }

                // Wait for new data to arrive from remote host or for new messages to send:
                if( state.OutgoingMessageTcs == null ) state.OutgoingMessageTcs = new TaskCompletionSource<Object>();
                if( nsReadTask == null ) nsReadTask = ns.ReadAsync( reusableBuffer, 0, 0 ).ConfigureAwait( false );

                Task c = await Task.WhenAny( state.OutgoingMessageTcs,  nsReadTask ).ConfigureAwait( false );
                if( c == state.OutgoingMessageTcs.Task ) state.OutgoingMessageTcs = null;
                else if( c == nsReadTask ) nsReadTask = null;
            }
        }
    }
}

Used like this:

public async Task Main(String[] args)
{
    PublicState state = new PublicState();
    Task clientTask = Client.RunAsync( new IPEndPoint(args[0]), state );

    await state.ConnectedTcs.Task; // awaits until TCP connection is established

    state.EnqueueMessage( new Message("foo") );
    state.EnqueueMessage( new Message("bar") );
    state.EnqueueMessage( new Message("baz") );

    await clientTask; // awaits until the TCP connection is closed
}

This code works, but I don't like it: it feels like I'm using TaskCompletionSource which is meant to represent an actual Task or some kind of background operation, whereas I'm really using TaskCompletionSource as a kind of cheap EventWaitHandle. I'm not using EventWaitHandle because it's IDisposable (I don't want to risk leaking native resources) and it lacks a WaitAsync or WaitOneAsync method. I could use SemaphoreSlim (which is awaitable, but wraps an EventWaitHandle) but my code doesn't really represent a good use of a semaphore.

Is my use of TaskCompletionSource<T> acceptable, or is there a better way to "un-await" execution in RunAsync when an item is added to OutgoingMessageQueue?

Another reason I feel it's "wrong" is that TaskCompletionSource<T> can only be used once, then it needs replacing. I'm keen to avoid extraneous allocations.

2条回答
smile是对你的礼貌
2楼-- · 2019-05-01 04:57

If I understood you correctly - TPL BufferBlock might be what you need. Analog of current Enqueue is Post, and you can receive next value via ReceiveAsync extension method.

So with BufferBlock your code becomes something like this:

class PublicState {
    internal readonly BufferBlock<Message> OutgoingMessageQueue = new BufferBlock<Message>();
    internal readonly TaskCompletionSource<Object> ConnectedTcs = new TaskCompletionSource<Object>();

    public void EnqueueMessage(Message message) {
        this.OutgoingMessageQueue.Post(message);
    }
}

static async Task RunAsync(IPEndPoint endPoint, PublicState state) {
    using (TcpClient tcp = new TcpClient()) {
        await tcp.ConnectAsync(endPoint.Address, endPoint.Port).ConfigureAwait(false);

        Byte[] reusableBuffer = new Byte[4096];

        using (NetworkStream ns = tcp.GetStream()) {
            state.ConnectedTcs.SetResult(null);

            Task<Int32> nsReadTask = null;
            Task<Message> newMessageTask = null;
            while (tcp.Connected) {
                // Wait for new data to arrive from remote host or for new messages to send:
                if (nsReadTask == null)
                    nsReadTask = ns.ReadAsync(reusableBuffer, 0, 0);
                if (newMessageTask == null)
                    newMessageTask = state.OutgoingMessageQueue.ReceiveAsync();
                var completed = await Task.WhenAny(nsReadTask, newMessageTask).ConfigureAwait(false);
                if (completed == newMessageTask) {
                    var result = await newMessageTask;
                    // do stuff
                    newMessageTask = null;
                }
                else {
                    var bytesRead = await nsReadTask;
                    nsReadTask = null;
                }
            }
        }
    }
}

As a bonus, this version is (I think) thread-safe, while your current version is not, because you are doing non-thread-safe things with OutgoingMessageTcs from potentially multiple threads (thread of RunAsync and thread of EnqueueMessages caller).

If for some reason you don't like BufferBlock - you can use AsyncCollection from Nito.AsyncEx nuget package in exactly the same way. Initialization becomes:

internal readonly AsyncCollection<Message> OutgoingMessageQueue = new AsyncCollection<Message>(new ConcurrentQueue<Message>());

And fetching:

if (newMessageTask == null)
   newMessageTask = state.OutgoingMessageQueue.TakeAsync();
查看更多
聊天终结者
3楼-- · 2019-05-01 04:58

To back up what others have mentioned, it does look like Microsoft's documentation mentions and even encourages developing a Semaphore class which is written on top of the Task objects here:

You can also build an asynchronous semaphore that does not rely on wait handles and instead works completely with tasks. To do this, you can use techniques such as those discussed in Consuming the Task-based Asynchronous Pattern for building data structures on top of Task.

This does make me wonder why such a prepackaged class does not already exist, but it certainly shows that this is fine.

查看更多
登录 后发表回答