I have a multithread application which uses boost::asio and boost::coroutine via its integration in boost::asio. Every thread has its own io_service object. The only shared state between threads are connection pools which are locked with mutex when connection is get or returned from/to the connection pool. When there is not enough connections in the pool I push infinite asio::steady_tiemer in internal structure of the pool and asynchronously waiting on it and I yielding from the couroutine function. When other thread returns connection to the pool it checks whether there is waiting timers, it gets waiting timer from the internal structure, it gets its io_service object and posts a lambda which wakes up the timer to resume the suspended coroutine. I have random crashes in the application. I try to investigate the problem with valgrind. It founds some issues but I cannot understand them because they happen in boost::coroutine and boost::asio internals. Here are fragments from my code and from valgrind output. Can someone see and explain the problem?
Here is the calling code:
template <class ContextsType>
void executeRequests(ContextsType& avlRequestContexts)
{
AvlRequestDataList allRequests;
for(auto& requestContext : avlRequestContexts)
{
if(!requestContext.pullProvider || !requestContext.toAskGDS())
continue;
auto& requests = requestContext.pullProvider->getRequestsData();
copy(requests.begin(), requests.end(), back_inserter(allRequests));
}
if(allRequests.size() == 0)
return;
boost::asio::io_service ioService;
curl::AsioMultiplexer multiplexer(ioService);
for(auto& request : allRequests)
{
using namespace boost::asio;
spawn(ioService, [&multiplexer, &request](yield_context yield)
{
request->prepare(multiplexer, yield);
});
}
while(true)
{
try
{
VLOG_DEBUG(avlGeneralLogger, "executeRequests: Starting ASIO event loop.");
ioService.run();
VLOG_DEBUG(avlGeneralLogger, "executeRequests: ASIO event loop finished.");
break;
}
catch(const std::exception& e)
{
VLOG_ERROR(avlGeneralLogger, "executeRequests: Error while executing GDS request: " << e.what());
}
catch(...)
{
VLOG_ERROR(avlGeneralLogger, "executeRequests: Unknown error while executing GDS request.");
}
}
}
Here is the prepare
function implementation which is called in spawned lambda:
void AvlRequestData::prepareImpl(curl::AsioMultiplexer& multiplexer,
boost::asio::yield_context yield)
{
auto& ioService = multiplexer.getIoService();
_connection = _pool.getConnection(ioService, yield);
_connection->prepareRequest(xmlRequest, xmlResponse, requestTimeoutMS);
multiplexer.addEasyHandle(_connection->getHandle(),
[this](const curl::EasyHandleResult& result)
{
if(0 == result.responseCode)
returnQuota();
VLOG_DEBUG(lastSeatLogger, "Response " << id << ": " << xmlResponse);
_pool.addConnection(std::move(_connection));
});
}
void AvlRequestData::prepare(curl::AsioMultiplexer& multiplexer,
boost::asio::yield_context yield)
{
try
{
prepareImpl(multiplexer, yield);
}
catch(const std::exception& e)
{
VLOG_ERROR(lastSeatLogger, "Error wile preparing request: " << e.what());
returnQuota();
}
catch(...)
{
VLOG_ERROR(lastSeatLogger, "Unknown error while preparing request.");
returnQuota();
}
}
The returnQuota
function is pure virtual method of the AvlRequestData
class and its implementation for the TravelportRequestData
class which is used in all my tests is the following:
void returnQuota() const override
{
auto& avlQuotaManager = AvlQuotaManager::getInstance();
avlQuotaManager.consumeQuotaTravelport(-1);
}
Here are push and pop methods of the connection pool.
auto AvlConnectionPool::getConnection(
TimerPtr timer,
asio::yield_context yield) -> ConnectionPtr
{
lock_guard<mutex> lock(_mutex);
while(_connections.empty())
{
_timers.emplace_back(timer);
timer->expires_from_now(
asio::steady_timer::clock_type::duration::max());
_mutex.unlock();
coroutineAsyncWait(*timer, yield);
_mutex.lock();
}
ConnectionPtr connection = std::move(_connections.front());
_connections.pop_front();
VLOG_TRACE(defaultLogger, str(format("Getted connection from pool: %s. Connections count %d.")
% _connectionPoolName % _connections.size()));
++_connectionsGiven;
return connection;
}
void AvlConnectionPool::addConnection(ConnectionPtr connection,
Side side /* = Back */)
{
lock_guard<mutex> lock(_mutex);
if(Front == side)
_connections.emplace_front(std::move(connection));
else
_connections.emplace_back(std::move(connection));
VLOG_TRACE(defaultLogger, str(format("Added connection to pool: %s. Connections count %d.")
% _connectionPoolName % _connections.size()));
if(_timers.empty())
return;
auto timer = _timers.back();
_timers.pop_back();
auto& ioService = timer->get_io_service();
ioService.post([timer](){ timer->cancel(); });
VLOG_TRACE(defaultLogger, str(format("Connection pool %s: Waiting thread resumed.")
% _connectionPoolName));
}
This is implementation of coroutineAsyncWait.
inline void coroutineAsyncWait(boost::asio::steady_timer& timer,
boost::asio::yield_context yield)
{
boost::system::error_code ec;
timer.async_wait(yield[ec]);
if(ec && ec != boost::asio::error::operation_aborted)
throw std::runtime_error(ec.message());
}
And finally the first part of the valgrind output:
==8189== Thread 41:
==8189== Invalid read of size 8
==8189== at 0x995F84: void boost::coroutines::detail::trampoline_push_void, void, boost::asio::detail::coro_entry_point, void (anonymous namespace)::executeRequests > >(std::vector<(anonymous namespace)::AvlRequestContext, std::allocator<(anonymous namespace)::AvlRequestContext> >&)::{lambda(boost::asio::basic_yield_context >)#1}>&, boost::coroutines::basic_standard_stack_allocator > >(long) (trampoline_push.hpp:65)
==8189== Address 0x2e3b5528 is not stack'd, malloc'd or (recently) free'd
When I use valgrind with debugger attached it stops in the following function in trampoline_push.hpp in boost::coroutine library.
53│ template< typename Coro >
54│ void trampoline_push_void( intptr_t vp)
55│ {
56│ typedef typename Coro::param_type param_type;
57│
58│ BOOST_ASSERT( vp);
59│
60│ param_type * param(
61│ reinterpret_cast< param_type * >( vp) );
62│ BOOST_ASSERT( 0 != param);
63│
64│ Coro * coro(
65├> reinterpret_cast< Coro * >( param->coro) );
66│ BOOST_ASSERT( 0 != coro);
67│
68│ coro->run();
69│ }