Yes. I know there have been a few questions around this time_out
in boost::asio
. My problem might to too simple for the asio
guys to solve here.
I am using boost::asio
on TCP protocol to read data over a network continuously in a loop as fast as I can.
Following function ReadData()
gets called continuously from a worker std::thread
in a while loop.
std::size_t ReadData(std::vector<unsigned char> & buffer, unsigned int size_to_read) {
boost::system::error_code error_code;
buffer.resize(size_to_read);
// Receive body
std::size_t bytes_read = boost::asio::read(*m_socket, boost::asio::buffer(buffer), error_code);
if (bytes_read == 0) {
// log error
return;
}
return bytes_read;
}
It works fine. Returns the data. All is well.
All I want, is to use a time_out for the boost::asio::read
. I learnt that I need to use boost::asio::async_read
with boost::asio::async_wait
for the time_out technique to work.
One boost example suggests to use boost::asio::async_read_until
?
Should I use boost::asio::async_read
or boost::asio::async_read_until
?
It does not matter whether I use boost::asio::async_read
or boost::asio::async_read_until
or boost::asio::read
. But I want the asio::read
call to be triggered & done within the call to my method ReadData
so that the client code does not get affected.
How can I achieve this ? Please suggest
OK, something like this should suit your purpose:
Rationale:
You appear to want to use blocking operations. Since that is the case, there is a good chance that you're not running a thread to pump the io loop.
So we kick off two simultaneous async tasks on the socket's io loop and then spawn a thread to:
a) reset (actually restart) the loop in case it's already been exhausted
b) run the loop to exhaustion (we could be cleverer here and only run it until the handler has indicated that some condition has been met, but that's a lesson for another day)
#include <type_traits>
template<class Stream, class ConstBufferSequence, class Handler>
auto async_read_with_timeout(Stream& stream, ConstBufferSequence&& sequence, std::size_t millis, Handler&& handler)
{
using handler_type = std::decay_t<Handler>;
using buffer_sequence_type = std::decay_t<ConstBufferSequence>;
using stream_type = Stream;
struct state_machine : std::enable_shared_from_this<state_machine>
{
state_machine(stream_type& stream, buffer_sequence_type sequence, handler_type handler)
: stream_(stream)
, sequence_(std::move(sequence))
, handler_(std::move(handler))
{}
void start(std::size_t millis)
{
timer_.expires_from_now(boost::posix_time::milliseconds(millis));
timer_.async_wait(strand_.wrap([self = this->shared_from_this()](auto&& ec) {
self->handle_timeout(ec);
}));
boost::asio::async_read(stream_, sequence_,
strand_.wrap([self = this->shared_from_this()](auto&& ec, auto size){
self->handle_read(ec, size);
}));
}
void handle_timeout(boost::system::error_code const& ec)
{
if (not ec and not completed_)
{
boost::system::error_code sink;
stream_.cancel(sink);
}
}
void handle_read(boost::system::error_code const& ec, std::size_t size)
{
assert(not completed_);
boost::system::error_code sink;
timer_.cancel(sink);
completed_ = true;
handler_(ec, size);
}
stream_type& stream_;
buffer_sequence_type sequence_;
handler_type handler_;
boost::asio::io_service::strand strand_ { stream_.get_io_service() };
boost::asio::deadline_timer timer_ { stream_.get_io_service() };
bool completed_ = false;
};
auto psm = std::make_shared<state_machine>(stream,
std::forward<ConstBufferSequence>(sequence),
std::forward<Handler>(handler));
psm->start(millis);
}
std::size_t ReadData(boost::asio::ip::tcp::socket& socket,
std::vector<unsigned char> & buffer,
unsigned int size_to_read,
boost::system::error_code& ec) {
buffer.resize(size_to_read);
ec.clear();
std::size_t bytes_read = 0;
auto& executor = socket.get_io_service();
async_read_with_timeout(socket, boost::asio::buffer(buffer),
2000, // 2 seconds for example
[&](auto&& err, auto size){
ec = err;
bytes_read = size;
});
// todo: use a more scalable executor than spawning threads
auto future = std::async(std::launch::async, [&] {
if (executor.stopped()) {
executor.reset();
}
executor.run();
});
future.wait();
return bytes_read;
}