I make an asynchronous chat server in C++ using the boost library. Almost everything works fine.
There are two ways for a client to disconnect:
- by pressing Ctrl + C (killing the client process)
- by entering "exit".
The former is OK. However, the latter has a problem. If a client disconnects with "exit", the next message, sent by another client, appears without the first several characters. After that it's OK.
For example: Several clients chat. One of them disconnects with "exit". After that, another client sends "0123456789abcdefghijk" and all clients receive only: "abcdefghijk". I don't know where's the problem, I guess it's something about streambuf. I found similar problem (almost the same) but in C#.
Here's the code:
#include<iostream>
#include<list>
#include<map>
#include<queue>
#include<vector>
#include<cstdlib>
#include<ctime>
#include<boost/thread.hpp>
#include<boost/bind.hpp>
#include<boost/asio.hpp>
#include<boost/asio/ip/tcp.hpp>
using namespace std;
using namespace boost::asio;
using namespace boost::asio::ip;
typedef boost::shared_ptr<tcp::socket> socket_ptr;
typedef boost::shared_ptr<string> string_ptr;
typedef boost::shared_ptr< list<socket_ptr> > clientList_ptr;
typedef boost::shared_ptr< list<string> > nameList_ptr;
const string waitingMsg("Waiting for clients...\n");
const string totalClientsMsg("Total clients: ");
const string errorReadingMsg("Error on reading: ");
const string errorWritingMsg("Error on writing: ");
const int EOF_ERROR_CODE = 2;
const int THREADS = 1;
io_service service;
tcp::acceptor acceptor(service, tcp::endpoint(tcp::v4(), 30001));
boost::mutex mtx;
clientList_ptr clientList(new list<socket_ptr>);
nameList_ptr nameList(new list<string>);
boost::asio::streambuf buff;
time_t timer;
void ready();
void accepting();
void askForName(socket_ptr clientSock, const boost::system::error_code& error);
void receiveName(socket_ptr clientSock, const boost::system::error_code& error,
std::size_t bytes_transferred);
void identify(socket_ptr clientSock, const boost::system::error_code& error, std::size_t bytes_transferred);
void accepted(socket_ptr clientSock, string_ptr name);
void receiveMessage(socket_ptr clientSock, string_ptr name);
void received(socket_ptr clientSock, string_ptr name, const boost::system::error_code& error,
std::size_t bytes_transferred);
bool extract(string_ptr message, std::size_t bytes_transferred);
bool clientSentExit(string_ptr clientSock);
void disconnectClient(socket_ptr clientSock, string_ptr name, const boost::system::error_code& error);
void writeMessage(socket_ptr clientSock, string_ptr message);
void responseSent(const boost::system::error_code& error);
void notification(socket_ptr sock, string_ptr name, const string headOfMsg, const string tailOfMsg);
int main(int argc, char* argv[])
{
try
{
vector<boost::shared_ptr<boost::thread> > threads;
ready();
for (int i = 0; i < THREADS; i++)
{
boost::shared_ptr <boost::thread> t(new boost::thread(boost::bind(&io_service::run, &service)));
threads.push_back(t);
}
for (int i = 0; i < THREADS; i++)
{
threads[i]->join();
}
}
catch (std::exception& error)
{
cerr << error.what() << endl;
}
return 0;
}
void ready()
{
cout << waitingMsg;
accepting();
}
void accepting()
{
socket_ptr clientSock(new tcp::socket(service));
acceptor.async_accept(*clientSock, boost::bind(&askForName, clientSock, boost::asio::placeholders::error));
}
void askForName(socket_ptr sock, const boost::system::error_code& error)
{
if (error)
{
cerr << "Error on accepting: " << error.message() << endl;
}
boost::asio::async_write(*sock, buffer("Please, enter your name:\n"),
boost::bind(&receiveName, sock, boost::asio::placeholders::error,
boost::asio::placeholders::bytes_transferred));
accepting();
}
void receiveName(socket_ptr sock, const boost::system::error_code& error,
std::size_t bytes_transferred)
{
if (error)
{
cerr << errorWritingMsg << error.message() << endl;
}
boost::asio::async_read_until(*sock, buff, '\n',
boost::bind(&identify, sock, boost::asio::placeholders::error,
boost::asio::placeholders::bytes_transferred));
}
void identify(socket_ptr sock, const boost::system::error_code& error,
std::size_t bytes_transferred)
{
if(error)
{
if (error.value() != EOF_ERROR_CODE)
{
cerr << errorReadingMsg << error.message() << endl;
}
return;
}
string_ptr name(new string(""));
if (!extract(name, bytes_transferred))
{
return;
}
if (find(nameList->begin(), nameList->end(), *name) != nameList->end())
{
boost::asio::async_write(*sock, buffer("This name is already in use! Please, select another name:\n"),
boost::bind(&receiveName, sock, boost::asio::placeholders::error,
boost::asio::placeholders::bytes_transferred));
return;
}
nameList->emplace_back(*name);
accepted(sock, name);
}
void accepted(socket_ptr sock, string_ptr name)
{
mtx.lock();
clientList->emplace_back(sock);
mtx.unlock();
notification(sock, name, "New client: ", " joined. ");
receiveMessage(sock, name);
}
void receiveMessage(socket_ptr sock, string_ptr name)
{
boost::asio::async_read_until(*sock, buff, '\n', boost::bind(&received, sock, name, boost::asio::placeholders::error,
boost::asio::placeholders::bytes_transferred));
}
void received(socket_ptr sock, string_ptr name, const boost::system::error_code& error,
std::size_t bytes_transferred)
{
if(error)
{
if (error.value() != EOF_ERROR_CODE)
{
cerr << errorReadingMsg << error.message() << endl;
}
disconnectClient(sock, name, error);
return;
}
if(!clientList->empty())
{
//mtx.lock();
string_ptr message(new string(""));
if(!extract(message, bytes_transferred))
{
//mtx.unlock();
disconnectClient(sock, name, error);
return;
}
*message = *name + ": " + *message + "\n";
cout << "ChatLog: " << *message << endl;
writeMessage(sock, message);
receiveMessage(sock, name);
//mtx.unlock();
}
}
bool extract(string_ptr message, std::size_t bytes_transferred)
{
mtx.lock();
buff.commit(bytes_transferred);
std::istream istrm(&buff);
//mtx.unlock();
std::getline(istrm, *message);
buff.consume(buff.size());
string_ptr msgEndl(new string(*message + "\n"));
mtx.unlock();
if(clientSentExit(msgEndl))
{
return false;
}
return true;
}
bool clientSentExit(string_ptr message)
{
return message->compare(0, 5, "exit\n") == 0;
}
void disconnectClient(socket_ptr sock, string_ptr name, const boost::system::error_code& error)
{
boost::system::error_code ec = error;
auto position = find(clientList->begin(), clientList->end(), sock);
auto namePos = find(nameList->begin(), nameList->end(), *name);
sock->shutdown(tcp::socket::shutdown_both, ec);
if (ec)
{
cerr << "Error on shutting: " << ec.message() << endl;
}
sock->close(ec);
if(ec)
{
cerr << "Error on closing: " << ec.message() << endl;
}
clientList->erase(position);
nameList->erase(namePos);
notification(sock, name, "", " disconnected. ");
}
void writeMessage(socket_ptr sock, string_ptr message)
{
for(auto& cliSock : *clientList)
{
if (cliSock->is_open() && cliSock != sock)
{
boost::asio::async_write(*cliSock, buffer(*message),
boost::bind(&responseSent, boost::asio::placeholders::error));
}
}
}
void responseSent(const boost::system::error_code& error)
{
if (error)
{
cerr << "Error on writing: " << error.message() << endl;
}
}
void notification(socket_ptr sock, string_ptr name, const string headOfMsg, const string tailOfMsg)
{
string_ptr serviceMsg (new string (headOfMsg + *name + tailOfMsg));
cout << *serviceMsg << totalClientsMsg << clientList->size() << endl;
*serviceMsg = *serviceMsg + "\n";
writeMessage(sock, serviceMsg);
cout << waitingMsg;
}
It's interesting that I have similar synchronous server with the same way of using of streambuf, but there are no such problems.