C# multithreading chat server, handle disconnect

2019-01-11 15:52发布

I'm looking for a way to handle a disconnect, because every time I close a client, the server stops working. I get an error message, that it is "unable to read beyond the end of the stream" in this line:

string message = reader.ReadString();

Also I need a way to delet the disconnected client from the clients list. Here is my code: Server

using System;
using System.Threading;
using System.Net.Sockets;
using System.IO;
using System.Net;
using System.Collections.Generic;

namespace Server
{
    class Server
    {
    public static List<TcpClient> clients = new List<TcpClient>();

    static void Main(string[] args)
    {
        IPAddress ip = IPAddress.Parse("127.0.0.1");
        TcpListener ServerSocket = new TcpListener(ip, 14000);
        ServerSocket.Start();

        Console.WriteLine("Server started.");
        while (true)
        {
            TcpClient clientSocket = ServerSocket.AcceptTcpClient();
            clients.Add(clientSocket);
            handleClient client = new handleClient();
            client.startClient(clientSocket);
        }
    }
}

public class handleClient
{
    TcpClient clientSocket;
    public void startClient(TcpClient inClientSocket)
    {
        this.clientSocket = inClientSocket;
        Thread ctThread = new Thread(Chat);
        ctThread.Start();
    }

    private void Chat()
    {
        while (true)
        {
            BinaryReader reader = new BinaryReader(clientSocket.GetStream());
            while (true)
            {
                string message = reader.ReadString();
                foreach (var client in Server.clients)
                {
                    BinaryWriter writer = new BinaryWriter(client.GetStream());
                    writer.Write(message);
                }
            }
        }
    }
}
}

Client

using System;
using System.Net.Sockets;
using System.IO;
using System.Threading;

namespace Client
{
   class Client
   {
       public static void Write()
       {
        TcpClient client = new TcpClient("127.0.0.1", 14000);
        while (true)
        {
            string str = Console.ReadLine();
            BinaryWriter writer = new BinaryWriter(client.GetStream());
            writer.Write(str);
        }
    }

    public static void Read()
    {
        TcpClient client = new TcpClient("127.0.0.1", 14000);
        while (true)
        {
            BinaryReader reader = new BinaryReader(client.GetStream());
            Console.WriteLine(reader.ReadString());
        }
    }

    static void Main(string[] args)
    {
        Thread Thread = new Thread(Write);
        Thread Thread2 = new Thread(Read);
        Thread.Start();
        Thread2.Start();
    }
}
}

1条回答
迷人小祖宗
2楼-- · 2019-01-11 16:10

every time I close a client, the server stops working. I get an error message, that it is "unable to read beyond the end of the stream"

This is, in some sense, completely normal. That is, when using BinaryReader, its normal behavior is to throw EndOfStreamException when reaching the end-of-stream.

Why did it reach end-of-stream? Well, because the client disconnected and that's what happens to the stream. At the socket level, what really happens is that a read operation completes with 0 as the count of bytes read. This is the indication that the client has gracefully closed the socket and will not be sending any more data.

In the .NET API, this is translated to the end of the NetworkStream that TcpClient uses to wrap the Socket object that's actually handling the network I/O. And this NetworkStream object is in turn being wrapped by your BinaryReader object. And BinaryReader throws that exception when it reaches the end of the stream.

Note that your code does not actually provide a graceful way for the user to close a client. They will have to use Ctrl+C, or kill the process outright. Using the former has the serendipitous effect of performing a graceful shutdown of the socket, but only because .NET is handling the termination of the process and runs finalizers on your objects, such as the TcpClient object used to connect to the server, and the finalizer calls Socket.Shutdown() to tell the server it's closing.

If you were to kill the process (e.g. using Task Manager), you'd find an IOException was thrown instead. Good network code should always be prepared to see an IOException; networks are unreliable and failures do occur. You want to do some reasonable, such as removing the remote endpoint from your connections, rather than just having the whole program crash.

Now, all that said, just because the EndOfStreamException is "normal", that does not mean the code you posted is, or is in any way an example of the right way to do network programming. You have a number of issues:

  1. There is no explicit graceful closure.

    Network I/O provides a normal way to close connections, which involves handshaking on both endpoints to indicate when they are done sending, and when they are done receiving. One endpoint will indicate it's done sending; the other will note this (using the 0-byte-read mentioned above) and then itself indicate it's done both sending and receiving.

    TcpClient and NetworkStream don't expose this directly, but you can use the TcpClient.Client property to get the Socket object to do a better graceful closure, i.e. one endpoint can indicate it's done sending, and still be able to wait until the other endpoint is also done sending.

    Using the TcpClient.Close() method to disconnect is like hanging up the phone on someone without saying "good-bye". Using Socket.Shutdown() is like finishing a phone call with a polite "okay, that's everything I wanted to say…was there anything else?"
  2. You are using BinaryReader but not handling the EndOfStreamException correctly.
  3. Your client uses two connections to communicate with the server.

    Network I/O uses the Socket object, which supports full-duplex communications. There is no need to create a second connection just to do both reading and writing. A single connection suffices, and is better because when you split the send and receive into two connections, then you also need to add something to your protocol so that the server knows those two connections represent a single client (which your code does not actually do).
  4. A client is not removed from server list when it disconnected (you noted this in your question).
  5. The client list is not thread-safe.
  6. Your Chat() method has an extra "while (true)" in it.

I have modified your original example to address all of the above, which I've presented here:

Server Program.cs:

class Program
{
    private static readonly object _lock = new object();
    private static readonly List<TcpClient> clients = new List<TcpClient>();

    public static TcpClient[] GetClients()
    {
        lock (_lock) return clients.ToArray();
    }

    public static int GetClientCount()
    {
        lock (_lock) return clients.Count;
    }

    public static void RemoveClient(TcpClient client)
    {
        lock (_lock) clients.Remove(client);
    }

    static void Main(string[] args)
    {
        IPAddress ip = IPAddress.Parse("127.0.0.1");
        TcpListener ServerSocket = new TcpListener(ip, 14000);
        ServerSocket.Start();

        Console.WriteLine("Server started.");
        while (true)
        {
            TcpClient clientSocket = ServerSocket.AcceptTcpClient();
            Console.WriteLine($"client connected: {clientSocket.Client.RemoteEndPoint}");
            lock (_lock) clients.Add(clientSocket);
            handleClient client = new handleClient();
            client.startClient(clientSocket);

            Console.WriteLine($"{GetClientCount()} clients connected");
        }
    }
}

Server handleClient.cs:

public class handleClient
{
    TcpClient clientSocket;

    public void startClient(TcpClient inClientSocket)
    {
        this.clientSocket = inClientSocket;
        Thread ctThread = new Thread(Chat);
        ctThread.Start();
    }

    private void Chat()
    {
        BinaryReader reader = new BinaryReader(clientSocket.GetStream());

        try
        {
            while (true)
            {
                string message = reader.ReadString();
                foreach (var client in Program.GetClients())
                {
                    BinaryWriter writer = new BinaryWriter(client.GetStream());
                    writer.Write(message);
                }
            }
        }
        catch (EndOfStreamException)
        {
            Console.WriteLine($"client disconnecting: {clientSocket.Client.RemoteEndPoint}");
            clientSocket.Client.Shutdown(SocketShutdown.Both);
        }
        catch (IOException e)
        {
            Console.WriteLine($"IOException reading from {clientSocket.Client.RemoteEndPoint}: {e.Message}");
        }

        clientSocket.Close();
        Program.RemoveClient(clientSocket);
        Console.WriteLine($"{Program.GetClientCount()} clients connected");
    }
}

Client Program.cs:

class Program
{
    private static readonly object _lock = new object();
    private static bool _closed;

    public static void Write(TcpClient client)
    {
        try
        {
            string str;
            SocketShutdown reason = SocketShutdown.Send;

            while ((str = Console.ReadLine()) != "")
            {
                lock (_lock)
                {
                    BinaryWriter writer = new BinaryWriter(client.GetStream());
                    writer.Write(str);

                    if (_closed)
                    {
                        // Remote endpoint already said they are done sending,
                        // so we're done with both sending and receiving.
                        reason = SocketShutdown.Both;
                        break;
                    }
                }
            }

            client.Client.Shutdown(reason);
        }
        catch (IOException e)
        {
            Console.WriteLine($"IOException writing to socket: {e.Message}");
        }
    }

    public static void Read(TcpClient client)
    {
        try
        {
            while (true)
            {
                try
                {
                    BinaryReader reader = new BinaryReader(client.GetStream());
                    Console.WriteLine(reader.ReadString());
                }
                catch (EndOfStreamException)
                {
                    lock (_lock)
                    {
                        _closed = true;
                        return;
                    }
                }
            }
        }
        catch (IOException e)
        {
            Console.WriteLine($"IOException reading from socket: {e.Message}");
        }
    }

    static void Main(string[] args)
    {
        TcpClient client = new TcpClient("127.0.0.1", 14000);
        Thread writeThread = new Thread(() => Write(client));
        Thread readThread = new Thread(() => Read(client));
        writeThread.Start();
        readThread.Start();

        writeThread.Join();
        readThread.Join();

        client.Close();
        Console.WriteLine("client exiting");
    }
}

Note that I did not, for the most part, address the inconsistent and unconventional naming you've used in your code. The only exception was for the thread variables in the client code, because I really don't like capitalized local variables that exactly match the name of a type.

You have some other problems as well, which the above revision of your code does not address. These include:

  1. You're using BinaryReader. This is in a lot of ways an annoying class to use. I recommend, especially for a chat-server scenario where you're only dealing with text anyway, that you switch to using StreamReader/StreamWriter.
  2. There is improper coupling/separation of concerns. Your Program class has server code, and the server code knows about the Program class. It would be much better to encapsulate both the server and client implementations into their own classes, separate from the main entry-point of the program, and to further decouple the top-level server code with the per-client data structure (use C#'s event to allow the top-level server code to be notified of important events, such as the need to remove a client from the list, without having the per-client data structure have to actually know about the top-level server object, never mind its client list).
  3. You should provide a mechanism to gracefully shutdown server.

Normally, I would say that these are outside the scope of an answer like this, which is already quite long. I've addressed the immediate concern in your code and then some, and that is nominally sufficient.

However, I've been meaning to write an updated version of the basic network programming example I'd written some years ago, as a sort of "intermediate" example, adding multiple client support, asynchronous operation, and using the latest C# features (like async/await). So, I went ahead and took some time to do that. I guess I'll post it to my blog eventually…that's a whole other project. In the meantime, here's that code (note that this is an entirely from-scratch example…it made more sense to do that than to try to rework the code you had)…

Most of the grunt work for this implementation is in a single class shared by the server and client:

/// <summary>
/// Represents a remote end-point for the chat server and clients
/// </summary>
public sealed class ConnectedEndPoint : IDisposable
{
    private readonly object _lock = new object();
    private readonly Socket _socket;
    private readonly StreamReader _reader;
    private readonly StreamWriter _writer;
    private bool _closing;

    /// <summary>
    /// Gets the address of the connected remote end-point
    /// </summary>
    public IPEndPoint RemoteEndPoint { get { return (IPEndPoint)_socket.RemoteEndPoint; } }

    /// <summary>
    /// Gets a <see cref="Task"/> representing the on-going read operation of the connection
    /// </summary>
    public Task ReadTask { get; }

    /// <summary>
    /// Connect to an existing remote end-point (server) and return the
    /// <see cref="ConnectedEndPoint"/> object representing the new connection
    /// </summary>
    /// <param name="remoteEndPoint">The address of the remote end-point to connect to</param>
    /// <param name="readCallback">The callback which will be called when a line of text is read from the newly-created connection</param>
    /// <returns></returns>
    public static ConnectedEndPoint Connect(IPEndPoint remoteEndPoint, Action<ConnectedEndPoint, string> readCallback)
    {
        Socket socket = new Socket(SocketType.Stream, ProtocolType.Tcp);

        socket.Connect(remoteEndPoint);

        return new ConnectedEndPoint(socket, readCallback);
    }

    /// <summary>
    /// Asynchronously accept a new connection from a remote end-point
    /// </summary>
    /// <param name="listener">The listening <see cref="Socket"/> which will accept the connection</param>
    /// <param name="readCallback">The callback which will be called when a line of text is read from the newly-created connection</param>
    /// <returns></returns>
    public static async Task<ConnectedEndPoint> AcceptAsync(Socket listener, Action<ConnectedEndPoint, string> readCallback)
    {
        Socket clientSocket = await Task.Factory.FromAsync(listener.BeginAccept, listener.EndAccept, null);

        return new ConnectedEndPoint(clientSocket, readCallback);
    }

    /// <summary>
    /// Write a line of text to the connection, sending it to the remote end-point
    /// </summary>
    /// <param name="text">The line of text to write</param>
    public void WriteLine(string text)
    {
        lock (_lock)
        {
            if (!_closing)
            {
                _writer.WriteLine(text);
                _writer.Flush();
            }
        }
    }

    /// <summary>
    /// Initiates a graceful closure of the connection
    /// </summary>
    public void Shutdown()
    {
        _Shutdown(SocketShutdown.Send);
    }

    /// <summary>
    /// Implements <see cref="IDisposable.Dispose"/>
    /// </summary>
    public void Dispose()
    {
        _reader.Dispose();
        _writer.Dispose();
        _socket.Close();
    }

    /// <summary>
    /// Constructor. Private -- use one of the factory methods to create new connections.
    /// </summary>
    /// <param name="socket">The <see cref="Socket"/> for the new connection</param>
    /// <param name="readCallback">The callback for reading lines on the new connection</param>
    private ConnectedEndPoint(Socket socket, Action<ConnectedEndPoint, string> readCallback)
    {
        _socket = socket;
        Stream stream = new NetworkStream(_socket);
        _reader = new StreamReader(stream, Encoding.UTF8, false, 1024, true);
        _writer = new StreamWriter(stream, Encoding.UTF8, 1024, true);

        ReadTask = _ConsumeSocketAsync(readCallback);
    }

    private void _Shutdown(SocketShutdown reason)
    {
        lock (_lock)
        {
            if (!_closing)
            {
                _socket.Shutdown(reason);
                _closing = true;
            }
        }
    }

    private async Task _ConsumeSocketAsync(Action<ConnectedEndPoint, string> callback)
    {
        string line;

        while ((line = await _reader.ReadLineAsync()) != null)
        {
            callback(this, line);
        }

        _Shutdown(SocketShutdown.Both);
    }
}

A client program would use that class directly. The server side is encapsulated in another class, found in the same DLL with the above:

/// <summary>
/// Event arguments for the <see cref="ChatServer.Status"/> event
/// </summary>
public class StatusEventArgs : EventArgs
{
    /// <summary>
    /// Gets the status text
    /// </summary>
    public string StatusText { get; }

    /// <summary>
    /// Constructor
    /// </summary>
    /// <param name="statusText">The status text</param>
    public StatusEventArgs(string statusText)
    {
        StatusText = statusText;
    }
}

/// <summary>
/// A server implementing a simple line-based chat server
/// </summary>
public class ChatServer
{
    private readonly object _lock = new object();
    private readonly Socket _listener;
    private readonly List<ConnectedEndPoint> _clients = new List<ConnectedEndPoint>();
    private bool _closing;

    /// <summary>
    /// Gets a task representing the listening state of the servdere
    /// </summary>
    public Task ListenTask { get; }

    /// <summary>
    /// Raised when the server has status to report
    /// </summary>
    public event EventHandler<StatusEventArgs> Status;

    /// <summary>
    /// Constructor
    /// </summary>
    /// <param name="port">The port number the server should listen on</param>
    public ChatServer(int port)
    {
        _listener = new Socket(SocketType.Stream, ProtocolType.Tcp);
        _listener.Bind(new IPEndPoint(IPAddress.Any, port));
        _listener.Listen(int.MaxValue);
        ListenTask = _ListenAsync();
    }

    /// <summary>
    /// Initiates a shutdown of the chat server.
    /// </summary>
    /// <remarks>This method closes the listening socket, which will subsequently
    /// cause the listening task to inform any connected clients that the server
    /// is shutting down, and to wait for the connected clients to finish a graceful
    /// closure of their connections.
    /// </remarks>
    public void Shutdown()
    {
        _listener.Close();
    }

    private async Task _ListenAsync()
    {
        try
        {
            while (true)
            {
                ConnectedEndPoint client = await ConnectedEndPoint.AcceptAsync(_listener, _ClientReadLine);

                _AddClient(client);
                _CleanupClientAsync(client);
            }
        }
        catch (ObjectDisposedException)
        {
            _OnStatus("Server's listening socket closed");
        }
        catch (IOException e)
        {
            _OnStatus($"Listening socket IOException: {e.Message}");
        }

        await _CleanupServerAsync();
    }

    private async Task _CleanupServerAsync()
    {
        ConnectedEndPoint[] clients;

        lock (_lock)
        {
            _closing = true;
            clients = _clients.ToArray();
        }

        foreach (ConnectedEndPoint client in clients)
        {
            try
            {
                client.WriteLine("Chat server is shutting down");
            }
            catch (IOException e)
            {
                _OnClientException(client, e.Message);
            }
            client.Shutdown();
        }

        // Clients are expected to participate in graceful closure. If they do,
        // this will complete when all clients have acknowledged the shutdown.
        // In a real-world program, may be a good idea to include a timeout in
        // case of network issues or misbehaving/crashed clients. Implementing
        // the timeout is beyond the scope of this proof-of-concept demo code.
        try
        {
            await Task.WhenAll(clients.Select(c => c.ReadTask));
        }
        catch (AggregateException)
        {
            // Actual exception for each client will have already
            // been reported by _CleanupClientAsync()
        }
    }

    // Top-level "clean-up" method, which will observe and report all exceptions
    // In real-world code, would probably want to simply log any unexpected exceptions
    // to a log file and then exit the process. Here, we just exit after reporting
    // exception info to caller. In either case, there's no need to observe a Task from
    // this method, and async void simplifies the call (no need to receive and then ignore
    // the Task object just to keep the compiler quiet).
    private async void _CleanupClientAsync(ConnectedEndPoint client)
    {
        try
        {
            await client.ReadTask;
        }
        catch (IOException e)
        {
            _OnClientException(client, e.Message);
        }
        catch (Exception e)
        {
            // Unexpected exceptions are programmer-error. They could be anything, and leave
            // the program in an unknown, possibly corrupt state. The only reasonable disposition
            // is to log, then exit.
            //
            // Full stack-trace, because who knows what this exception was. Will need the
            // stack-trace to do any diagnostic work.
            _OnStatus($"Unexpected client connection exception. {e}");
            Environment.Exit(1);
        }
        finally
        {
            _RemoveClient(client);
            client.Dispose();
        }
    }

    private void _ClientReadLine(ConnectedEndPoint readClient, string text)
    {
        _OnStatus($"Client {readClient.RemoteEndPoint}: \"{text}\"");

        lock (_lock)
        {
            if (_closing)
            {
                return;
            }

            text = $"{readClient.RemoteEndPoint}: {text}";

            foreach (ConnectedEndPoint client in _clients.Where(c => c != readClient))
            {
                try
                {
                    client.WriteLine(text);
                }
                catch (IOException e)
                {
                    _OnClientException(client, e.Message);
                }
            }
        }
    }

    private void _AddClient(ConnectedEndPoint client)
    {
        lock (_lock)
        {
            _clients.Add(client);
            _OnStatus($"added client {client.RemoteEndPoint} -- {_clients.Count} clients connected");
        }
    }

    private void _RemoveClient(ConnectedEndPoint client)
    {
        lock (_lock)
        {
            _clients.Remove(client);
            _OnStatus($"removed client {client.RemoteEndPoint} -- {_clients.Count} clients connected");
        }
    }

    private void _OnStatus(string statusText)
    {
        Status?.Invoke(this, new StatusEventArgs(statusText));
    }

    private void _OnClientException(ConnectedEndPoint client, string message)
    {
        _OnStatus($"Client {client.RemoteEndPoint} IOException: {message}");
    }
}

And that is, for the most part, all you need. The DLL code above is referenced (in my example) by two different programs, a server and a client.

Here's the server:

class Program
{
    private const int _kportNumber = 5678;

    static void Main(string[] args)
    {
        ChatServer server = new ChatServer(_kportNumber);

        server.Status += (s, e) => WriteLine(e.StatusText);

        Task serverTask = _WaitForServer(server);

        WriteLine("Press return to shutdown server...");
        ReadLine();

        server.Shutdown();
        serverTask.Wait();
    }

    private static async Task _WaitForServer(ChatServer server)
    {
        try
        {
            await server.ListenTask;
        }
        catch (Exception e)
        {
            WriteLine($"Server exception: {e}");
        }
    }
}

And here's the client:

class Program
{
    private const int _kportNumber = 5678;

    static void Main(string[] args)
    {
        IPEndPoint remoteEndPoint = new IPEndPoint(IPAddress.Loopback, _kportNumber);
        ConnectedEndPoint server = ConnectedEndPoint.Connect(remoteEndPoint, (c, s) => WriteLine(s));

        _StartUserInput(server);
        _SafeWaitOnServerRead(server).Wait();
    }

    private static void _StartUserInput(ConnectedEndPoint server)
    {
        // Get user input in a new thread, so main thread can handle waiting
        // on connection.
        new Thread(() =>
        {
            try
            {
                string line;

                while ((line = ReadLine()) != "")
                {
                    server.WriteLine(line);
                }

                server.Shutdown();
            }
            catch (IOException e)
            {
                WriteLine($"Server {server.RemoteEndPoint} IOException: {e.Message}");
            }
            catch (Exception e)
            {
                WriteLine($"Unexpected server exception: {e}");
                Environment.Exit(1);
            }
        })
        {
            // Setting IsBackground means this thread won't keep the
            // process alive. So, if the connection is closed by the server,
            // the main thread can exit and the process as a whole will still
            // be able to exit.
            IsBackground = true
        }.Start();
    }

    private static async Task _SafeWaitOnServerRead(ConnectedEndPoint server)
    {
        try
        {
            await server.ReadTask;
        }
        catch (IOException e)
        {
            WriteLine($"Server {server.RemoteEndPoint} IOException: {e.Message}");
        }
        catch (Exception e)
        {
            // Should never happen. It's a bug in this code if it does.
            WriteLine($"Unexpected server exception: {e}");
        }
    }
}

In my opinion, one of the most important things for you to note in the above is that the ConnectedEndPoint and ChatServer classes have zero dependency on the classes which use them. Through the use of callback delegates and events, the code that depends on these classes are able to interact bi-directionally without these supporting classes having to know about the types that code resides in (see "inversion of control", which this is a variation of).

The more you can make your code relationships look like a tree with only single-direction references, the easier it will be to write the code, and to maintain it later.

Note: I used both events and callback delegates for the sake of illustration. Either approach works fine on its own. The main trade-offs are complexity vs. flexibility. Using events makes the code more flexible — event handlers can be added and removed as necessary — but if one implements events using the .NET convention of a method signature with a sender and an EventArgs parameter, it is somewhat more "heavy-weight" than just passing a simple callback delegate when creating the object in question. I put an example of each in the code, and you can decide which approaches you prefer in which situations.

You'll also note that the above makes heavy use of C#'s asynchronous features. At first, this may make the code seem harder to read. But in fact, it's actually much easier to get everything working using these features, than if I were to try to use the older BeginXXX()/EndXXX() methods or, heaven forbid, dedicate a single thread to each connection (which scales very poorly as the client count goes up). It's absolutely worth getting used to thinking of operations which are inherently asynchronous, such as network I/O, in this way.

查看更多
登录 后发表回答