NetworkStream.ReadAsync with a cancellation token

2019-01-10 21:10发布

Here the proof.
Any idea what is wrong in this code ?

    [TestMethod]
    public void TestTest()
    {
        var tcp = new TcpClient() { ReceiveTimeout = 5000, SendTimeout = 20000 };
        tcp.Connect(IPAddress.Parse("176.31.100.115"), 25);
        bool ok = Read(tcp.GetStream()).Wait(30000);
        Assert.IsTrue(ok);
    }

    async Task Read(NetworkStream stream)
    {
        using (var cancellationTokenSource = new CancellationTokenSource(5000))
        {
            int receivedCount;
            try
            {
                var buffer = new byte[1000];
                receivedCount = await stream.ReadAsync(buffer, 0, 1000, cancellationTokenSource.Token);
            }
            catch (TimeoutException e)
            {
                receivedCount = -1;
            }
        }
    }

5条回答
别忘想泡老子
2楼-- · 2019-01-10 21:20

Per the description in Softlion's answer:

Combine the async call with a delay task (Task.Delay) using Task.WaitAny. When the delay elapses before the io task, close the stream. This will force the task to stop. You should handle the async exception on the io task correctly. And you should add a continuation task for both the dealy task and the io task.

I've made some code that gives you the async read with timeout:

using System;
using System.Net.Sockets;
using System.Threading.Tasks;

namespace ConsoleApplication2013
{
    class Program
    {
        /// <summary>
        /// Does an async read on the supplied NetworkStream and will timeout after the specified milliseconds.
        /// </summary>
        /// <param name="ns">NetworkStream object on which to do the ReadAsync</param>
        /// <param name="s">Socket associated with ns (needed to close to abort the ReadAsync task if the timeout occurs)</param>
        /// <param name="timeoutMillis">number of milliseconds to wait for the read to complete before timing out</param>
        /// <param name="buffer"> The buffer to write the data into</param>
        /// <param name="offset">The byte offset in buffer at which to begin writing data from the stream</param>
        /// <param name="amountToRead">The maximum number of bytes to read</param>
        /// <returns>
        /// a Tuple where Item1 is true if the ReadAsync completed, and false if the timeout occurred,
        /// and Item2 is set to the amount of data that was read when Item1 is true
        /// </returns>
        public static async Task<Tuple<bool, int>> ReadWithTimeoutAsync(NetworkStream ns, Socket s, int timeoutMillis, byte[] buffer, int offset, int amountToRead)
        {
            Task<int> readTask = ns.ReadAsync(buffer, offset, amountToRead);
            Task timeoutTask = Task.Delay(timeoutMillis);

            int amountRead = 0;

            bool result = await Task.Factory.ContinueWhenAny<bool>(new Task[] { readTask, timeoutTask }, (completedTask) =>
            {
                if (completedTask == timeoutTask) //the timeout task was the first to complete
                {
                    //close the socket (unless you set ownsSocket parameter to true in the NetworkStream constructor, closing the network stream alone was not enough to cause the readTask to get an exception)
                    s.Close();
                    return false; //indicate that a timeout occurred
                }
                else //the readTask completed
                {
                    amountRead = readTask.Result;
                    return true;
                }
            });

            return new Tuple<bool, int>(result, amountRead);
        }

        #region sample usage
        static void Main(string[] args)
        {
            Program p = new Program();
            Task.WaitAll(p.RunAsync());
        }

        public async Task RunAsync()
        {
            Socket s = new Socket(SocketType.Stream, ProtocolType.Tcp);

            Console.WriteLine("Connecting...");
            s.Connect("127.0.0.1", 7894);  //for a simple server to test the timeout, run "ncat -l 127.0.0.1 7894"
            Console.WriteLine("Connected!");

            NetworkStream ns = new NetworkStream(s);

            byte[] buffer = new byte[1024];
            Task<Tuple<bool, int>> readWithTimeoutTask = Program.ReadWithTimeoutAsync(ns, s, 3000, buffer, 0, 1024);
            Console.WriteLine("Read task created");

            Tuple<bool, int> result = await readWithTimeoutTask;

            Console.WriteLine("readWithTimeoutTask is complete!");
            Console.WriteLine("Read succeeded without timeout? " + result.Item1 + ";  Amount read=" + result.Item2);
        }
        #endregion
    }
}
查看更多
forever°为你锁心
3楼-- · 2019-01-10 21:23

I finally found a workaround. Combine the async call with a delay task (Task.Delay) using Task.WaitAny. When the delay elapses before the io task, close the stream. This will force the task to stop. You should handle the async exception on the io task correctly. And you should add a continuation task for both the delayed task and the io task.

It also work with tcp connections. Closing the connection in another thread (you could consider it is the delay task thread) forces all async tasks using/waiting for this connection to stop.

--EDIT--

Another cleaner solution suggested by @vtortola: use the cancellation token to register a call to stream.Close:

async Task Read(NetworkStream stream)
{
    using (var cancellationTokenSource = new CancellationTokenSource(5000))
    {
        using(cancellationTokenSource.Token.Register(() => stream.Close()))
        {
            int receivedCount;
            try
            {
                var buffer = new byte[1000];
                receivedCount = await stream.ReadAsync(buffer, 0, 1000, cancellationTokenSource.Token);
            }
            catch (TimeoutException e)
            {
                receivedCount = -1;
            }
        }
    }
}
查看更多
做个烂人
4楼-- · 2019-01-10 21:24

There are a few problems there that pop out:

  1. CancellationToken throws OperationCanceledException, not TimeoutException (cancellation is not always due to timeout).
  2. ReceiveTimeout doesn't apply, since you're doing an asynchronous read. Even if it did, you'd have a race condition between IOException and OperationCanceledException.
  3. Since you're synchronously connecting the socket, you'll want a high timeout on this test (IIRC, the default connection timeout is ~90 seconds, but can be changed as Windows monitors the network speeds).
  4. The correct way to test asynchronous code is with an asynchronous test:

    [TestMethod]
    public async Task TestTest()
    {
        var tcp = new TcpClient() { ReceiveTimeout = 5000, SendTimeout = 20000 };
        tcp.Connect(IPAddress.Parse("176.31.100.115"), 25);
        await Read(tcp.GetStream());
    }
    
查看更多
forever°为你锁心
5楼-- · 2019-01-10 21:36

Cancellation is cooperative. NetworkStream.ReadAsync must cooperate to be able to be cancelled. It is kind of hard for it to do that because that would potentially leave the stream in an undefined state. What bytes have already been read from the Windows TCP stack and what haven't? IO is not easily cancellable.

Reflector shows that NetworkStream does not override ReadAsync. This means that it will get the default behavior of Stream.ReadAsync which just throws the token away. There is no generic way Stream operations can be cancelled so the BCL Stream class does not even try (it cannot try - there is no way to do this).

You should set a timeout on the Socket.

查看更多
一夜七次
6楼-- · 2019-01-10 21:42

I know it's a bit late, but this is a simple thing I usually do to cancel ReadAsync() (in my case: NetworkStream) (Tested):

Task.Run(() => 
{
   //  This will create a new CancellationTokenSource, that will cancel itself after 30 seconds
   using (CancellationTokenSource TimeOut = new CancellationTokenSource(30*1000))
   {
       Task&lt;int&gt; r = Stream.ReadAsync(reply, 0, reply.Length);

        //   This will throw a OperationCanceledException
        r.Wait(TimeOut.Token);
    }

}

Edit: I have put that in another Task, to clarify.

查看更多
登录 后发表回答