First, I have read all related questions listed.
They say, "you must have an existing shared_ptr to this before you can use shared_from_this." As far as I can see, there is no way I am violating that condition. I create the instance of Foo as a shared_ptr and enforced that it is always created as a shared_ptr. I then, stored the shared_ptr in a collection. Yet, I still get the bad_weak_ptr exception when shared_from_this is called.
#pragma once
#include <memory>
#include <vector>
//--------------------------------------------------------------------
class Foo : std::enable_shared_from_this<Foo>
{
public:
typedef std::shared_ptr<Foo> SharedPtr;
// Ensure all instances are created as shared_ptr in order to fulfill requirements for shared_from_this
static Foo::SharedPtr Create()
{
return Foo::SharedPtr(new Foo());
};
Foo(const Foo &) = delete;
Foo(Foo &&) = delete;
Foo & operator = (const Foo &) = delete;
Foo & operator = (Foo &&) = delete;
~Foo() {};
// We have to defer the start until we are fully constructed because we share_from_this()
void Start()
{
DoStuff();
}
private:
Foo() {}
void DoStuff()
{
auto self(shared_from_this());
}
};
//--------------------------------------------------------------------
int main()
{
std::vector<Foo::SharedPtr> foos;
Foo::SharedPtr foo = Foo::Create();
foos.emplace_back(foo);
foo->Start();
return 0;
}
You must inherit enable_shared_from_this
with public
specifier according to
Publicly inheriting from std::enable_shared_from_this provides the type T with a member function shared_from_this.
from http://en.cppreference.com/w/cpp/memory/enable_shared_from_this.
So write
class Foo : public std::enable_shared_from_this<Foo>
First off, you start the threads before ever posting work, so the io_service::run()
is prone to complete before DoAccept
is actually done.
Next, the base class must be PUBLIC for enable_shared_from_this to work:
class Connection : public std::enable_shared_from_this<Connection> {
Working self-contained code:
#include <iostream>
#include <mutex>
namespace SomeNamespace{
struct Logger {
enum { LOGGER_SEVERITY_INFO };
void Log(std::string const& msg, std::string const& file, unsigned line, int level) const {
static std::mutex mx;
std::lock_guard<std::mutex> lk(mx);
std::cout << file << ":" << line << " level:" << level << " " << msg << "\n";
}
template <typename... Args>
void LogF(std::string const& msg, Args const&... args) const {
static std::mutex mx;
std::lock_guard<std::mutex> lk(mx);
static char buf[2048];
snprintf(buf, sizeof(buf)-1, msg.c_str(), args...);
std::cout << buf << "\n";
}
static Logger &GetInstance() {
static Logger This;
return This;
}
};
} // namespace Somenamespace
#include <boost/asio.hpp>
#include <atomic>
#include <condition_variable>
#include <memory>
//--------------------------------------------------------------------
class ConnectionManager;
//--------------------------------------------------------------------
class Connection : public std::enable_shared_from_this<Connection> {
public:
typedef std::shared_ptr<Connection> SharedPtr;
// Ensure all instances are created as shared_ptr in order to fulfill requirements for shared_from_this
static Connection::SharedPtr Create(ConnectionManager *connectionManager, boost::asio::ip::tcp::socket &socket);
Connection(const Connection &) = delete;
Connection(Connection &&) = delete;
Connection &operator=(const Connection &) = delete;
Connection &operator=(Connection &&) = delete;
~Connection();
// We have to defer the start until we are fully constructed because we share_from_this()
void Start();
void Stop();
void Send(const std::vector<char> &data);
private:
ConnectionManager *m_owner;
boost::asio::ip::tcp::socket m_socket;
std::atomic<bool> m_stopped;
boost::asio::streambuf m_receiveBuffer;
mutable std::mutex m_sendMutex;
std::shared_ptr<std::vector<boost::asio::const_buffer> > m_sendBuffers;
bool m_sending;
std::vector<char> m_allReadData; // for testing
Connection(ConnectionManager *connectionManager, boost::asio::ip::tcp::socket socket);
void DoReceive();
void DoSend();
};
//--------------------------------------------------------------------
//#include "Connection.h"
//#include "ConnectionManager.h"
//**ConnectionManager.h **
//#pragma once
//#include "Connection.h"
// Boost Includes
#include <boost/asio.hpp>
// Standard Includes
#include <thread>
#include <vector>
//--------------------------------------------------------------------
class ConnectionManager {
public:
ConnectionManager(unsigned port, size_t numThreads);
ConnectionManager(const ConnectionManager &) = delete;
ConnectionManager(ConnectionManager &&) = delete;
ConnectionManager &operator=(const ConnectionManager &) = delete;
ConnectionManager &operator=(ConnectionManager &&) = delete;
~ConnectionManager();
void Start();
void Stop();
void OnConnectionClosed(Connection::SharedPtr connection);
protected:
boost::asio::io_service m_io_service;
boost::asio::ip::tcp::acceptor m_acceptor;
boost::asio::ip::tcp::socket m_listenSocket;
std::vector<std::thread> m_threads;
mutable std::mutex m_connectionsMutex;
std::vector<Connection::SharedPtr> m_connections;
void IoServiceThreadProc();
void DoAccept();
};
//--------------------------------------------------------------------
#include <boost/bind.hpp>
#include <algorithm>
//--------------------------------------------------------------------
Connection::SharedPtr Connection::Create(ConnectionManager *connectionManager, boost::asio::ip::tcp::socket &socket) {
return Connection::SharedPtr(new Connection(connectionManager, std::move(socket)));
}
//--------------------------------------------------------------------
Connection::Connection(ConnectionManager *connectionManager, boost::asio::ip::tcp::socket socket)
: m_owner(connectionManager), m_socket(std::move(socket)), m_stopped(false), m_receiveBuffer(), m_sendMutex(),
m_sendBuffers(), m_sending(false), m_allReadData() {}
//--------------------------------------------------------------------
Connection::~Connection() {
// Boost uses RAII, so we don't have anything to do. Let thier destructors take care of business
}
//--------------------------------------------------------------------
void Connection::Start() { DoReceive(); }
//--------------------------------------------------------------------
void Connection::Stop() {
// The entire connection class is only kept alive, because it is a shared pointer and always has a ref count
// as a consequence of the outstanding async receive call that gets posted every time we receive.
// Once we stop posting another receive in the receive handler and once our owner release any references to
// us, we will get destroyed.
m_stopped = true;
m_owner->OnConnectionClosed(shared_from_this());
}
//--------------------------------------------------------------------
void Connection::Send(const std::vector<char> &data) {
std::lock_guard<std::mutex> lock(m_sendMutex);
// If the send buffers do not exist, then create them
if (!m_sendBuffers) {
m_sendBuffers = std::make_shared<std::vector<boost::asio::const_buffer> >();
}
// Copy the data to be sent to the send buffers
m_sendBuffers->emplace_back(boost::asio::buffer(data));
DoSend();
}
//--------------------------------------------------------------------
void Connection::DoSend() {
// According to the boost documentation, we cannot issue an async_write while one is already outstanding
//
// If that is the case, it is OK, because we've added the data to be sent to a new set of buffers back in
// the Send method. Notice how the original buffer is moved, so therefore will be null below and how Send
// will create new buffers and accumulate data to be sent until we complete in the lamda
//
// When we complete in the lamda, if we have any new data to be sent, we call DoSend once again.
//
// It is important though, that DoSend is only called from the lambda below and the Send method.
if (!m_sending && m_sendBuffers) {
m_sending = true;
auto copy = std::move(m_sendBuffers);
auto self(shared_from_this());
boost::asio::async_write(m_socket, *copy,
[self, copy](const boost::system::error_code &errorCode, size_t bytes_transferred) {
std::lock_guard<std::mutex> lock(self->m_sendMutex);
self->m_sending = false;
if (errorCode) {
// An error occurred
return;
}
self->DoSend();
});
}
}
//--------------------------------------------------------------------
void Connection::DoReceive() {
SomeNamespace::Logger::GetInstance().Log(__PRETTY_FUNCTION__, __FILE__, __LINE__, SomeNamespace::Logger::LOGGER_SEVERITY_INFO);
auto self(shared_from_this()); // ***EXCEPTION HERE****
boost::asio::async_read_until(m_socket, m_receiveBuffer, '#',
[self](const boost::system::error_code &errorCode, size_t bytesRead) {
if (errorCode) {
// Notify our masters that we are ready to be destroyed
self->m_owner->OnConnectionClosed(self);
// An error occured
return;
}
// Grab the read data
std::istream stream(&self->m_receiveBuffer);
std::string data;
std::getline(stream, data, '#');
// Issue the next receive
if (!self->m_stopped) {
self->DoReceive();
}
});
}
//--------------------------------------------------------------------
//**ConnectionManager.cpp **
//#include "ConnectionManager.h"
//#include "Logger.h"
#include <boost/bind.hpp>
#include <system_error>
//------------------------------------------------------------------------------
ConnectionManager::ConnectionManager(unsigned port, size_t numThreads)
: m_io_service(), m_acceptor(m_io_service, boost::asio::ip::tcp::endpoint(boost::asio::ip::tcp::v4(), port)),
m_listenSocket(m_io_service), m_threads(numThreads) {}
//------------------------------------------------------------------------------
ConnectionManager::~ConnectionManager() { Stop(); }
//------------------------------------------------------------------------------
void ConnectionManager::Start() {
if (m_io_service.stopped()) {
m_io_service.reset();
}
DoAccept();
for (auto &thread : m_threads) {
if (!thread.joinable()) {
thread = std::thread(&ConnectionManager::IoServiceThreadProc, this);
}
}
}
//------------------------------------------------------------------------------
void ConnectionManager::Stop() {
{
std::lock_guard<std::mutex> lock(m_connectionsMutex);
m_connections.clear();
}
// TODO - Will the stopping of the io_service be enough to kill all the connections and ultimately have them get
// destroyed?
// Because remember they have outstanding ref count to thier shared_ptr in the async handlers
m_io_service.stop();
for (auto &thread : m_threads) {
if (thread.joinable()) {
thread.join();
}
}
}
//------------------------------------------------------------------------------
void ConnectionManager::IoServiceThreadProc() {
try {
// Log that we are starting the io_service thread
{
const std::string msg("io_service socket thread starting.");
SomeNamespace::Logger::GetInstance().Log(msg, __FILE__, __LINE__,
SomeNamespace::Logger::LOGGER_SEVERITY_INFO);
}
// Run the asynchronous callbacks from the socket on this thread
// Until the io_service is stopped from another thread
m_io_service.run();
} catch (std::system_error &e) {
SomeNamespace::Logger::GetInstance().LogF("System error caught in io_service socket thread. Error Code: %d", e.code().value());
} catch (std::exception &e) {
SomeNamespace::Logger::GetInstance().LogF("Standard exception caught in io_service socket thread. Exception: %s", e.what());
} catch (...) {
SomeNamespace::Logger::GetInstance().LogF("Unhandled exception caught in io_service socket thread.");
}
SomeNamespace::Logger::GetInstance().LogF("io_service socket thread exiting.");
}
//------------------------------------------------------------------------------
void ConnectionManager::DoAccept() {
SomeNamespace::Logger::GetInstance().Log(__PRETTY_FUNCTION__, __FILE__, __LINE__, SomeNamespace::Logger::LOGGER_SEVERITY_INFO);
m_acceptor.async_accept(m_listenSocket, [this](const boost::system::error_code errorCode) {
if (errorCode) {
return;
}
{
// Create the connection from the connected socket
Connection::SharedPtr connection = Connection::Create(this, m_listenSocket);
{
std::lock_guard<std::mutex> lock(m_connectionsMutex);
m_connections.push_back(connection);
connection->Start();
}
}
DoAccept();
});
}
//------------------------------------------------------------------------------
void ConnectionManager::OnConnectionClosed(Connection::SharedPtr connection) {
std::lock_guard<std::mutex> lock(m_connectionsMutex);
auto itConnection = std::find(m_connections.begin(), m_connections.end(), connection);
if (itConnection != m_connections.end()) {
m_connections.erase(itConnection);
}
}
//------------------------------------------------------------------------------
//**main.cpp**
//#include "ConnectionManager.h"
#include <cstring>
#include <iostream>
#include <string>
int main() {
ConnectionManager connectionManager(4000, 2);
connectionManager.Start();
std::this_thread::sleep_for(std::chrono::minutes(1));
connectionManager.Stop();
}