.NET 4.5 ASync TCP Server Memory Leak - BeginRecei

2020-08-01 07:21发布

问题:

We needed a Windows Service that supported TCP communications with a number of clients. So I based it on the MSDN Async Example The thing with the Microsoft example is that the client sends one message to the server, the server then resends the message then closes. Great!

So having blindly deployed this to our prod and customer site we got reports that it had crashed. Looking at Prod we noticed that after 1 day, the memory usage grew to just under 1GB before throwing an OutOfMemoryException. Lots of testing here!

This happened with 1 client connected. It sends an XML based message that is quite large ~1200 bytes every second. Yes, every second.

The service then does some processing and sends a return XML message back to the client.

I've pulled the TCP Client/Server communications into a simple set of Console applications to replicate the issue - mainly to eliminate other managed/unmanaged resources. Now I've been looking at this for a number of days now and have pulled all of my hair and teeth out.

In my example I am focusing on the following classes:

B2BSocketManager (Server Listener, Sender, Receiver)

NOTE I have changed the code to return the whoopsy readonly byte array - not the sent message. I've also removed the new AsyncCallback(delegate) from the BeginReceive/BeginSend calls.

namespace Acme.OPC.Service.Net.Sockets
{
    using Acme.OPC.Service.Logging;
    using System;
    using System.Linq;
    using System.Net;
    using System.Net.Sockets;
    using System.Text;
    using System.Threading;
    using System.Threading.Tasks;

    public class B2BSocketManager : ISocketSender
    {
        private ManualResetEvent allDone = new ManualResetEvent(false);
        private IPEndPoint _localEndPoint;
        private readonly byte[] whoopsy = new byte[] { 1, 2, 3, 4, 5, 6, 7, 8, 9, 10 };

        public B2BSocketManager(IPAddress address, int port)
        {
            _localEndPoint = new IPEndPoint(address, port);
        }

        public void StartListening()
        {
            StartListeningAsync();
        }

        private async Task StartListeningAsync()
        {
            await System.Threading.Tasks.Task.Factory.StartNew(() => ListenForConnections());
        }

        public void ListenForConnections()
        {
            Socket listener = new Socket(_localEndPoint.Address.AddressFamily, SocketType.Stream, ProtocolType.Tcp);
            Log.Instance.Info("B2BSocketManager Listening on " + _localEndPoint.Address.ToString() + ":" + _localEndPoint.Port.ToString());

            try
            {
                listener.Bind(_localEndPoint);
                listener.Listen(100);

                while (true)
                {
                    allDone.Reset();

                    Log.Instance.Info("B2BSocketManager Waiting for a connection...");
                    listener.BeginAccept(new AsyncCallback(ConnectCallback), listener);
                    allDone.WaitOne();
                }
            }
            catch (Exception e)
            {
                Log.Instance.Info(e.ToString());
            }
        }

        public void ConnectCallback(IAsyncResult ar)
        {
            allDone.Set();

            Socket listener = (Socket)ar.AsyncState;
            Socket handler = listener.EndAccept(ar);
            handler.DontFragment = false;
            handler.ReceiveBufferSize = ClientSocket.BufferSize;

            Log.Instance.Info("B2BSocketManager Client has connected on " + handler.RemoteEndPoint.ToString());

            ClientSocket state = new ClientSocket();
            state.workSocket = handler;

            handler.BeginReceive(state.buffer, 0, ClientSocket.BufferSize, 0, new AsyncCallback(ReadCallback), state); // SocketFlags.None
        }

        public void ReadCallback(IAsyncResult ar)
        {
            String message = String.Empty;

            ClientSocket state = (ClientSocket)ar.AsyncState;
            Socket handler = state.workSocket;

            int bytesRead = handler.EndReceive(ar);
            if (bytesRead > 0)
            {
                Console.WriteLine("Received " + bytesRead + " at " + DateTime.Now.ToString("yyyy-MM-dd HH:mm:ss"));

                message = Encoding.ASCII.GetString(state.buffer, 0, bytesRead);

                if (!string.IsNullOrEmpty(message))
                {
                    Send(handler, message);
                }

                handler.BeginReceive(state.buffer, 0, ClientSocket.BufferSize, 0, ReadCallback, state);
            }
        }

        public void Send(Socket socket, string data)
        {
            // just hard coding the whoopse readonly byte array
            socket.BeginSend(whoopsy, 0, whoopsy.Length, 0, SendCallback, socket);
        }

        private void SendCallback(IAsyncResult ar)
        {
            Socket state = (Socket)ar.AsyncState;

            try
            {
                int bytesSent = state.EndSend(ar);
            }
            catch (Exception e)
            {
                Log.Instance.ErrorException("", e);
            }
        }
    }
}

ClientSender (Client Sender)

The client sends an xml string to the server every 250 milliseconds. I wanted to see how this would perform. The xml is slightly smaller than what we send on our live system and is just created using a formatted string.

namespace TestHarness
{
    using System;
    using System.Linq;
    using System.Net;
    using System.Net.Sockets;
    using System.Text;
    using System.Threading;

    class ClientSender
    {
        private static ManualResetEvent connectDone = new ManualResetEvent(false);
        private static ManualResetEvent receiveDone = new ManualResetEvent(false);
        private static ManualResetEvent sendDone = new ManualResetEvent(false);

        private static void StartSpamming(Socket client)
        {
            while(true)
            {
                string message = @"<request type=""da"">{0}{1}</request>" + Environment.NewLine;

                Send(client, string.Format(message, "Be someone" + DateTime.Now.ToString("yyyy-MM-dd HH:mm:ss"), String.Concat(Enumerable.Repeat("<test>Oooooooops</test>", 50))));

                Thread.Sleep(250);
            }
        }

        public static void Connect(EndPoint remoteEP)
        {
            Socket listener = new Socket(remoteEP.AddressFamily, SocketType.Stream, ProtocolType.Tcp);
            listener.BeginConnect(remoteEP, new AsyncCallback(ConnectCallback), listener);

            connectDone.WaitOne();
        }

        private static void ConnectCallback(IAsyncResult ar)
        {
            try
            {
                // Retrieve the socket from the state object.
                Socket client = (Socket)ar.AsyncState;

                // Complete the connection.
                client.EndConnect(ar);

                Console.WriteLine("Socket connected to {0}", client.RemoteEndPoint.ToString());

                // Signal that the connection has been made.
                connectDone.Set();

                System.Threading.Tasks.Task.Factory.StartNew(() => StartSpamming(client));
            }
            catch (Exception e)
            {
                Console.WriteLine(e.ToString());
            }
        }

        private static void Send(Socket client, String data)
        {
            byte[] byteData = Encoding.ASCII.GetBytes(data);
            client.BeginSend(byteData, 0, byteData.Length, SocketFlags.None, new AsyncCallback(SendCallback), client);
        }

        private static void SendCallback(IAsyncResult ar)
        {
            try
            {
                Socket client = (Socket)ar.AsyncState;
                int bytesSent = client.EndSend(ar);
                Console.WriteLine("Sent {0} bytes to server " + DateTime.Now.ToString("yyyy-MM-dd HH:mm:ss"), bytesSent);
                sendDone.Set();
            }
            catch (Exception e)
            {
                Console.WriteLine(e.ToString());
            }
        }

        private static void Receive(Socket client)
        {
            try
            {
                StateObject state = new StateObject();
                state.workSocket = client;

                client.BeginReceive(state.buffer, 0, StateObject.BufferSize, 0, new AsyncCallback(ReceiveCallback), state);
            }
            catch (Exception e)
            {
                Console.WriteLine(e.ToString());
            }
        }

        private static void ReceiveCallback(IAsyncResult ar)
        {
            try
            {
                StateObject state = (StateObject)ar.AsyncState;
                Socket client = state.workSocket;
                int bytesRead = client.EndReceive(ar);
                if (bytesRead > 0)
                {
                    state.sb.Append(Encoding.ASCII.GetString(state.buffer, 0, bytesRead));
                    client.BeginReceive(state.buffer, 0, StateObject.BufferSize, 0, new AsyncCallback(ReceiveCallback), state);
                }
                else
                {
                    if (state.sb.Length > 1)
                    {
                        string response = state.sb.ToString();
                    }
                    receiveDone.Set();
                }
            }
            catch (Exception e)
            {
                Console.WriteLine(e.ToString());
            }
        }
    }
}

State Class

All I wanted was a read buffer to strip the message out of and try and load into XML. But this has been removed from this cut down version to see the issues with just the sockets.

using System;
using System.Linq;
using System.Net.Sockets;

namespace Acme.OPC.Service.Net.Sockets
{
    public class ClientSocket
    {
        public Socket workSocket = null;
        public const int BufferSize = 4096;
        public readonly byte[] buffer = new byte[BufferSize];
    }
}

I've shared my code here:

Explore One Drive Share

I've been profiling things using my Telerik JustTrace Profiler. I just start the server app then start the client app. This is on my Windows 7 64-bit VS2013 development environment.

Run 1

I see Memory Usage is around 250KB with the Working Set at around 20MB. The time seems to tick along nicely, then all of a sudden the memory usage will step up after around 12 minutes. Though things vary.

It would also appear that after the ~16:45:55 (Snapshot) when I Force GC, the memory starts going up each time I press it as opposed to leaving it running and upping automatically which might be an issue with Telerik.

Run 2

Then if I am creating the array of bytes within the Send with (which is more of what the service does - sends an appropriate response string to the client):

public void Send(Socket socket, string data)
{
    byte[] byteData = Encoding.ASCII.GetBytes(data);
    socket.BeginSend(byteData, 0, byteData.Length, 0, SendCallback, socket);
}

We can see the memory going up more:

Which brings me to what is being retained in memory. I see a log of System.Threading.OverlappedData and I have noticed ExecutionContexts in there. The OverlappedData has a reference to a byte array this time.

With Roots Paths to GC

I am running the profiling overnight so will hopefully get to add more info to this in the morning. Hopefully somebody can point me in the right direction before that - if I'm doing something wrong and I am too blind/silly to see it.

And here are the results of running overnight: