I am writing a simple tcp client to send and receive single lines of text. The asynchronous operations are handled by std::future in order to faciliate blocking queries with timeouts. Unfortunately, my test application crashes with an access violation when destructing the server object.
Here is my code:
TCPClient.hpp
#ifndef __TCPCLIENT_H__
#define __TCPCLIENT_H__
#include <boost/asio.hpp>
#include <boost/asio/use_future.hpp>
#include <memory>
#include <vector>
#include <future>
#include <thread>
#include <chrono>
#include <iostream>
#include <iterator>
using namespace boost::asio;
class TCPClient {
public:
TCPClient();
~TCPClient();
void connect(const std::string& address, const std::string& port);
void disconnect();
std::string sendMessage(const std::string& msg);
private:
boost::asio::io_service ioservice;
boost::asio::io_service::work work;
std::thread t;
std::unique_ptr<boost::asio::ip::tcp::socket> socket;
};
inline TCPClient::TCPClient() : ioservice(), work(ioservice) {
t = std::thread([&]() {
try {
ioservice.run();
}
catch (const boost::system::system_error& e) {
std::cerr << e.what() << std::endl;
}
});
}
inline TCPClient::~TCPClient() {
disconnect();
ioservice.stop();
if (t.joinable()) t.join();
}
inline void TCPClient::connect(const std::string& address, const std::string& port) {
socket.reset(new ip::tcp::socket(ioservice));
ip::tcp::resolver::query query(address, port);
std::future<ip::tcp::resolver::iterator> conn_result = async_connect(*socket, ip::tcp::resolver(ioservice).resolve(query), use_future);
if (conn_result.wait_for(std::chrono::seconds(6)) != std::future_status::timeout) {
conn_result.get(); // throws boost::system::system_error if the operation fails
}
else {
//socket->close();
// throw timeout_error("Timeout");
throw std::exception("timeout");
}
}
inline void TCPClient::disconnect() {
if (socket) {
try {
socket->shutdown(ip::tcp::socket::shutdown_both);
std::cout << "socket points to " << std::addressof(*socket) << std::endl;
socket->close();
}
catch (const boost::system::system_error& e) {
// ignore
std::cerr << "ignored error " << e.what() << std::endl;
}
}
}
inline std::string TCPClient::sendMessage(const std::string& msg) {
auto time_over = std::chrono::system_clock::now() + std::chrono::seconds(4);
/*
// Doesn't affect the error
std::future<size_t> write_fut = boost::asio::async_write(*socket, boost::asio::buffer(msg), boost::asio::use_future);
try {
write_fut.get();
}
catch (const boost::system::system_error& e) {
std::cerr << e.what() << std::endl;
}
*/
boost::asio::streambuf response;
std::future<std::size_t> read_fut = boost::asio::async_read_until(*socket, response, '\n', boost::asio::use_future);
if (read_fut.wait_until(time_over) != std::future_status::timeout) {
std::cout << "read " << read_fut.get() << " bytes" << std::endl;
return std::string(std::istreambuf_iterator<char>(&response), std::istreambuf_iterator<char>());
}
else {
std::cout << "socket points to " << std::addressof(*socket) << std::endl;
throw std::exception("timeout");
}
}
#endif
main.cpp
#include <iostream>
#include "TCPClient.hpp"
int main(int argc, char* argv[]) {
TCPClient client;
try {
client.connect("localhost", "27015");
std::cout << "Response: " << client.sendMessage("Hello!") << std::endl;
}
catch (const boost::system::system_error& e) {
std::cerr << e.what() << std::endl;
}
catch (const std::exception& e) {
std::cerr << e.what() << std::endl;
}
system("pause");
return 0;
}
The output is "timeout" as expected (test server sends no data on purpose), but ioservice.run()
crashes immediately (access violation) after closing the socket in TCPClient::disconnect()
. Am I doing some memory mismanagment here?
Compiler is MSVC 12.0.31101.00 Update 4 (Visual Studio 2013)
recvmsg
is receiving into a buffer (streambuf
) that was freed after throwing the exception in TCPClient::sendMessage
(line 105, end of scope).
You forgot to cancel the asynchronous operation (async_read_until
) started in line 97. Fix it:
else {
socket->cancel(); // ADDED
std::cout << "socket points to " << std::addressof(*socket) << std::endl;
throw std::runtime_error("timeout");
}
Or even, just
socket.reset(); // ADDED
Same goes for other timeout paths.
The other answer addresses what went wrong.
On a higher level, though, you're using futures, just to immediately await their return.
It struck me that this is actually not asynchrony at all, and you should be able to do:
- without threading, and joining
- without
.stop()
- without
work
and work.reset()
- without a explicit constructor or destructor
- without the
unique_ptr<socket>
and the lifetime management that came with it
- without the
future<>
, and the .get()
and future_status
checking that come with it
All in all, you can do a lot simpler, e.g. using a simple helper function like this:
class TCPClient {
public:
void disconnect();
void connect(const std::string& address, const std::string& port);
std::string sendMessage(const std::string& msg);
private:
using error_code = boost::system::error_code;
template<typename AllowTime> void await_operation(AllowTime const& deadline_or_duration) {
using namespace boost::asio;
ioservice.reset();
{
high_resolution_timer tm(ioservice, deadline_or_duration);
tm.async_wait([this](error_code ec) { if (ec != error::operation_aborted) socket.cancel(); });
ioservice.run_one();
}
ioservice.run();
}
boost::asio::io_service ioservice { };
boost::asio::ip::tcp::socket socket { ioservice };
};
E.g. connect(...)
used to be:
socket.reset(new ip::tcp::socket(ioservice));
ip::tcp::resolver::query query(address, port);
std::future<ip::tcp::resolver::iterator> conn_result = async_connect(*socket, ip::tcp::resolver(ioservice).resolve(query), use_future);
if (conn_result.wait_for(std::chrono::seconds(6)) != std::future_status::timeout) {
conn_result.get(); // throws boost::system::system_error if the operation fails
}
else {
socket->cancel();
// throw timeout_error("Timeout");
throw std::runtime_error("timeout");
}
It now becomes:
async_connect(socket,
ip::tcp::resolver(ioservice).resolve({address, port}),
[&](error_code ec, ip::tcp::resolver::iterator it) { if (ec) throw std::runtime_error(ec.message()); });
await_operation(std::chrono::seconds(6));
Like wise, sendMessage
becomes:
streambuf response;
async_read_until(socket, response, '\n', [&](error_code ec, size_t bytes_read) {
if (ec) throw std::runtime_error(ec.message());
std::cout << "read " << bytes_read << " bytes" << std::endl;
});
await_operation(std::chrono::system_clock::now() + std::chrono::seconds(4));
return {std::istreambuf_iterator<char>(&response), {}};
Note these are significantly simpler. Note, also, that correct exception messages are now thrown, depending on the cause of the failures.
Full Demo
Live On Coliru
#ifndef __TCPCLIENT_H__
#define __TCPCLIENT_H__
#include <boost/asio.hpp>
#include <boost/asio/high_resolution_timer.hpp>
#include <iostream>
class TCPClient {
public:
void disconnect();
void connect(const std::string& address, const std::string& port);
std::string sendMessage(const std::string& msg);
private:
using error_code = boost::system::error_code;
template<typename AllowTime> void await_operation(AllowTime const& deadline_or_duration) {
using namespace boost::asio;
ioservice.reset();
{
high_resolution_timer tm(ioservice, deadline_or_duration);
tm.async_wait([this](error_code ec) { if (ec != error::operation_aborted) socket.cancel(); });
ioservice.run_one();
}
ioservice.run();
}
boost::asio::io_service ioservice { };
boost::asio::ip::tcp::socket socket { ioservice };
};
inline void TCPClient::connect(const std::string& address, const std::string& port) {
using namespace boost::asio;
async_connect(socket,
ip::tcp::resolver(ioservice).resolve({address, port}),
[&](error_code ec, ip::tcp::resolver::iterator it) { if (ec) throw std::runtime_error(ec.message()); });
await_operation(std::chrono::seconds(6));
}
inline void TCPClient::disconnect() {
using namespace boost::asio;
if (socket.is_open()) {
try {
socket.shutdown(ip::tcp::socket::shutdown_both);
socket.close();
}
catch (const boost::system::system_error& e) {
// ignore
std::cerr << "ignored error " << e.what() << std::endl;
}
}
}
inline std::string TCPClient::sendMessage(const std::string& msg) {
using namespace boost::asio;
streambuf response;
async_read_until(socket, response, '\n', [&](error_code ec, size_t bytes_read) {
if (ec) throw std::runtime_error(ec.message());
std::cout << "read " << bytes_read << " bytes" << std::endl;
});
await_operation(std::chrono::system_clock::now() + std::chrono::seconds(4));
return {std::istreambuf_iterator<char>(&response), {}};
}
#endif
#include <iostream>
//#include "TCPClient.hpp"
int main(/*int argc, char* argv[]*/) {
TCPClient client;
try {
client.connect("127.0.0.1", "27015");
std::cout << "Response: " << client.sendMessage("Hello!") << std::endl;
}
catch (const boost::system::system_error& e) {
std::cerr << e.what() << std::endl;
}
catch (const std::exception& e) {
std::cerr << e.what() << std::endl;
}
}
BONUS
If you want even more convenience, have a generalized callback handler that just raises the exception:
struct raise {
template <typename... A> void operator()(error_code ec, A...) const {
if (ec) throw std::runtime_error(ec.message());
}
};
Now, the bodies become even simpler in absense of lambdas:
inline void TCPClient::connect(const std::string& address, const std::string& port) {
async_connect(socket, ip::tcp::resolver(ioservice).resolve({address, port}), raise());
await_operation(std::chrono::seconds(6));
}
inline std::string TCPClient::sendMessage(const std::string& msg) {
streambuf response;
async_read_until(socket, response, '\n', raise());
await_operation(std::chrono::system_clock::now() + std::chrono::seconds(4));
return {std::istreambuf_iterator<char>(&response), {}};
}
See the adapted demo: Live On Coliru too