.NET question about asynchronous socket operations

2019-02-16 17:08发布

问题:

I've been looking everywhere for examples on how to deal with TCP message framing. I see many examples where NetworkStreams are passed into a StreamReader or StreamWriter object and then use ReadLine or WriteLine methods for '\n' delimited messages. My application protocol contains messages ending in '\n' so the NetworkStream seems to be the way to go. However, I can't find any specific examples on the proper way to handle all of this in combination with asynchronous sockets. When ReceiveCallback() is called below, how do I implement the NetworkStream and StreamReader classes to deal with message framing? According to what I've read, I may get part of one message in one receive and the rest of the message (including the '\n') in the next receive. Does this mean I could get the end of one message and part of the next message? Surely, there must be an easier way to handle this.

I have the following code:

    private void StartRead(Socket socket)
    {
        try
        {
            StateObject state = new StateObject();
            state.AsyncSocket = socket;

            socket.BeginReceive(state.Buffer, 0, StateObject.BufferSize, 0, new AsyncCallback(ReceiveCallback), state);
        }
        catch (SocketException)
        {
            m_Socket.Shutdown(SocketShutdown.Both);
            Disconnect();
        }
    }

    private void ReceiveCallback(IAsyncResult ar)
    {
        try
        {
            StateObject state = (StateObject)ar.AsyncState;

            int bytes_read = state.AsyncSocket.EndReceive(ar);

            char[] chars = new char[bytes_read + 1];
            System.Text.Decoder decoder = System.Text.Encoding.UTF8.GetDecoder();
            int charLength = decoder.GetChars(state.Buffer, 0, bytes_read, chars, 0);

            String data = new String(chars);

            ParseMessage(data);

            StartRead(state.AsyncSocket);
        }
        catch (SocketException)
        {
            m_Socket.Shutdown(SocketShutdown.Both);
            Disconnect();
        }
    }

回答1:

Basically you create a buffer, and each time you receive data, you add that data to the buffer and determine if you have already received one or more full messages.

Between ReceiveCallback and StartRead you won't receive any asynchronous messages (incoming data will automatically be buffered on the socket level) so it's the ideal place to check for full messages and remove them from the buffer.

All variations are possible, including receiving the end of message 1, plus message 2, plus the beginning of message 3, all in one chunk.

I don't recommend UTF8-decoding the chunk, as one UTF8-character may consist of two bytes, and if they get split between chunks your data could be corrupted. You could keep a byte[]-buffer (MemoryStream?) and split messages on the 0x0A byte in that case.



回答2:

Prefixing the chunks with a length is better than using a separator character. You don't have to deal with any sort of escaping in order to send data with a newline that way.

This answer might not be relevant to you now, because it uses features from the AsyncCTP, which will only be in the next version of .net. However, it does make things much more concise. Essentially you write exactly the code you'd do for the synchronous case, but insert 'await' statements where there are asynchronous calls.

    public static async Task<Byte[]> ReadChunkAsync(this Stream me) {
        var size = BitConverter.ToUInt32(await me.ReadExactAsync(4), 0);
        checked {
            return await me.ReadExactAsync((int)size);
        }
    }

    public static async Task<Byte[]> ReadExactAsync(this Stream me, int count) {
        var buf = new byte[count];
        var t = 0;
        while (t < count) {
            var n = await me.ReadAsync(buf, t, count - t);
            if (n <= 0) {
                if (t > 0) throw new IOException("End of stream (fragmented)");
                throw new IOException("End of stream");
            }
            t += n;
        }
        return buf;
    }

    public static void WriteChunk(this Stream me, byte[] buffer, int offset, int count) {
        me.Write(BitConverter.GetBytes(count), 0, 4);
        me.Write(buffer, offset, count);
    }


回答3:

OK here's what I ended up doing. I created a reader thread that creates a NetworkStream and a StreamReader based on the network stream. Then I use StreamReader.ReadLine to read in the lines that way. It's a synchronous call but it is in its own thread. It seems to work a lot better. I had to implement this since that's our protocol for the application (newline-delimited messages). I know other people will be looking around like hell for the answer like I did here's the relevant read code in my Client class:

public class Client
{
    Socket              m_Socket;

    EventWaitHandle     m_WaitHandle;
    readonly object     m_Locker;
    Queue<IEvent>       m_Tasks;
    Thread              m_Thread;

    Thread              m_ReadThread;

    public Client()
    {
        m_WaitHandle = new AutoResetEvent(false);
        m_Locker = new object();
        m_Tasks = new Queue<IEvent>();

        m_Thread = new Thread(Run);
        m_Thread.IsBackground = true;
        m_Thread.Start();
    }

    public void EnqueueTask(IEvent task)
    {
        lock (m_Locker)
        {
            m_Tasks.Enqueue(task);
        }

        m_WaitHandle.Set();
    }

    private void Run()
    {
        while (true)
        {
            IEvent task = null;

            lock (m_Locker)
            {
                if (m_Tasks.Count > 0)
                {
                    task = m_Tasks.Dequeue();

                    if (task == null)
                    {
                        return;
                    }
                }
            }

            if (task != null)
            {
                task.DoTask(this);
            }
            else
            {
                m_WaitHandle.WaitOne();
            }
        }
    }

    public void Connect(string hostname, int port)
    {
        try
        {
            m_Socket = new Socket(AddressFamily.InterNetwork, SocketType.Stream, ProtocolType.Tcp);

            IPAddress[] IPs = Dns.GetHostAddresses(hostname);

            m_Socket.BeginConnect(IPs, port, new AsyncCallback(ConnectCallback), m_Socket);
        }
        catch (SocketException)
        {
            m_Socket.Close();
            OnConnect(false, "Unable to connect to server.");
        }
    }

    private void ConnectCallback(IAsyncResult ar)
    {
        try
        {
            Socket socket = (Socket)ar.AsyncState;

            socket.EndConnect(ar);

            OnConnect(true, "Successfully connected to server.");

            m_ReadThread = new Thread(new ThreadStart(this.ReadThread));
            m_ReadThread.Name = "Read Thread";
            m_ReadThread.IsBackground = true;
            m_ReadThread.Start();
        }
        catch (SocketException)
        {
            m_Socket.Close();
            OnConnect(false, "Unable to connect to server.");
        }
    }

    void ReadThread()
    {
        NetworkStream networkStream = new NetworkStream(m_Socket);
        StreamReader reader = new StreamReader(networkStream);

        while (true)
        {
            try
            {
                String message = reader.ReadLine();

                // To keep the code thread-safe, enqueue a task in the CLient class thread to parse the message received.
                EnqueueTask(new ServerMessageEvent(message));
            }
            catch (IOException)
            {
                // The code will reach here if the server disconnects from the client. Make sure to cleanly shutdown...
                Disconnect();
                break;
            }
        }
    }

    ... Code for sending/parsing the message in the Client class thread.
}