I'm using boost 1.50 with VS2010, reading using a Windows file HANDLE (which seems to be relatively uncommon compared to asio use with sockets).
Problem
The handle_read
callback gets to line 8 and returns the first bit with all of line 1 appended; further callbacks cycle through from line 2 again, ad nauseum:
- open a short text file (below)
- get expected
handle_read
callbacks with correct content for lines 1 through 7
- the next callback has a longer-than-expected bytes-read
length
parameter
- though not using
length
, getline
extracts a correspondingly longer line from the asio stream buffer
- extracted content switches mid-line to repeat the first line from the input file
- further
handle_read
callbacks recycle lines 2 through 7, then the "long hybrid" line problem happens
- ad nauseum
Input
LINE 1 abcdefghijklmnopqrstuvwxyzABCDEFGHIJKLMNOPQRSTUVWXYZ0123456789
LINE 2 abcdefghijklmnopqrstuvwxyzABCDEFGHIJKLMNOPQRSTUVWXYZ0123456789
...3--E similarly...
LINE F abcdefghijklmnopqrstuvwxyzABCDEFGHIJKLMNOPQRSTUVWXYZ0123456789
Output
Here's the first 15 lines of output (it continues forever):
line #1, length 70, getline() [69] 'LINE 1 abcdefghijklmnopqrstuvwxyzABCDEFGHIJKLMNOPQRSTUVWXYZ0123456789'
line #2, length 70, getline() [69] 'LINE 2 abcdefghijklmnopqrstuvwxyzABCDEFGHIJKLMNOPQRSTUVWXYZ0123456789'
...line #3 through #6 are fine too...
line #7, length 70, getline() [69] 'LINE 7 abcdefghijklmnopqrstuvwxyzABCDEFGHIJKLMNOPQRSTUVWXYZ0123456789'
line #8, length 92, getline() [91] 'LINE 8 abcdefghijklmnoLINE 1 abcdefghijklmnopqrstuvwxyzABCDEFGHIJKLMNOPQRSTUVWXYZ0123456789'
line #9, length 70, getline() [69] 'LINE 2 abcdefghijklmnopqrstuvwxyzABCDEFGHIJKLMNOPQRSTUVWXYZ0123456789'
...line #10 through #13 are fine...
line #14, length 70, getline() [69] 'LINE 7 abcdefghijklmnopqrstuvwxyzABCDEFGHIJKLMNOPQRSTUVWXYZ0123456789'
line #15, length 92, getline() [91] 'LINE 8 abcdefghijklmnoLINE 1 abcdefghijklmnopqrstuvwxyzABCDEFGHIJKLMNOPQRSTUVWXYZ0123456789'
...
Please note how output lines #8 and #15 are a mix of input LINE 8 and LINE 1.
The code
#include "stdafx.h"
#include <cassert>
#include <iostream>
#include <string>
#include <boost/asio.hpp>
#include <boost/bind.hpp>
#include <Windows.h>
#include <WinBase.h>
class AsyncReader
{
public:
AsyncReader(boost::asio::io_service& io_service, HANDLE handle)
: io_service_(io_service),
input_buffer(/*size*/ 8192),
input_handle(io_service, handle)
{
start_read();
}
void start_read()
{
boost::asio::async_read_until(input_handle, input_buffer, '\n',
boost::bind(&AsyncReader::handle_read, this,
boost::asio::placeholders::error,
boost::asio::placeholders::bytes_transferred));
}
void handle_read(const boost::system::error_code& error, std::size_t length);
// void handle_write(const boost::system::error_code& error);
private:
boost::asio::io_service& io_service_;
boost::asio::streambuf input_buffer;
boost::asio::windows::stream_handle input_handle;
};
void AsyncReader::handle_read(const boost::system::error_code& error, std::size_t length)
{
if (!error)
{
static int count = 0;
++count;
// method 1: (same problem)
// const char* pStart = boost::asio::buffer_cast<const char*>(input_buffer.data());
// std::string s(pStart, length);
// input_buffer.consume(length);
// method 2:
std::istream is(&input_buffer);
std::string s;
assert(std::getline(is, s));
std::cout << "line #" << count << ", length " << length << ", getline() [" << s.size() << "] '" << s << "'\n";
start_read();
}
else if (error == boost::asio::error::not_found)
std::cerr << "Did not receive ending character!\n";
else
std::cerr << "Misc error during read!\n";
}
int _tmain(int argc, _TCHAR* argv[])
{
boost::asio::io_service io_service;
HANDLE handle = ::CreateFile(TEXT("c:/temp/input.txt"),
GENERIC_READ,
0, // share mode
NULL, // security attribute: NULL = default
OPEN_EXISTING, // creation disposition
FILE_FLAG_OVERLAPPED,
NULL // template file
);
AsyncReader obj(io_service, handle);
io_service.run();
std::cout << "Normal termination\n";
getchar();
return 0;
}
My thoughts
- It might be something in the
CreateFile
options - it didn't work at all until I switched to FILE_FLAG_OVERLAPPED
- not sure if there are other requirements that don't even manifest as errors...?
- I've tried
input_buffer.commit
and even .consume
- not sure if there's something like that I'm supposed to do, even though all the example code I could find (for sockets) suggests getline
takes care of that...
- Exasperation / I miss Linux....
This mailing list post describes the same problem. While CreateFile
with FILE_FLAG_OVERLAPPED
allows for asynchronous I/O, it does not establish it as a stream in the context of Boost.Asio. For streams, Boost.Asio implements read_some
as read_some_at
with the offset always being 0
. This is the source of the problem, as the ReadFile()
documentation states:
For files that support byte offsets, you must specify a byte offset at which to start reading from the file.
Adapting to Type Requirements
Boost.Asio is written very generically, often requiring arguments to meet a certain type requirement rather than be a specific type. Therefore, it is often possible to adapt either the I/O object or its service to obtain the desired behavior. First, one must identify what the adapted interface needs to support. In this case, async_read_until
accepts any type fulfilling the type requirements of AsyncReadStream
. AsyncReadStream
's requirements are fairly basic, requiring a void async_read_some(MutableBufferSequence, ReadHandler)
member function.
As the offset value will need to be tracked throughout the composed async_read_until
operation, a simple type meeting the requirements of ReadHandler can be introduced that will wrap an application's ReadHandler, and update the offset accordingly.
namespace detail {
/// @brief Handler to wrap asynchronous read_some_at operations.
template <typename Handler>
class read_some_offset_handler
{
public:
read_some_offset_handler(Handler handler, boost::uint64_t& offset)
: handler_(handler),
offset_(offset)
{}
void operator()(
const boost::system::error_code& error,
std::size_t bytes_transferred)
{
offset_ += bytes_transferred;
// If bytes were transferred, then set the error code as success.
// EOF will be detected on next read. This is to account for
// the read_until algorithm behavior.
const boost::system::error_code result_ec =
(error && bytes_transferred)
? make_error_code(boost::system::errc::success) : error;
handler_(result_ec, bytes_transferred);
}
//private:
Handler handler_;
boost::uint64_t& offset_;
};
/// @brief Hook that allows the wrapped handler to be invoked
/// within specific context. This is critical to support
/// composed operations being invoked within a strand.
template <typename Function,
typename Handler>
void asio_handler_invoke(
Function function,
detail::read_some_offset_handler<Handler>* handler)
{
boost_asio_handler_invoke_helpers::invoke(
function, handler->handler_);
}
} // namespace detail
The asio_handler_invoke
hook will be found through ADL to support invoking user handlers in the proper context. This is critical for tread safety when a composed operation is being invoked within a strand
. For more details on composed operations and strands, see this answer.
The following class will adapt boost::asio::windows::random_access_handle
to meet the type requirements of AsyncReadStream
.
/// @brief Adapts AsyncRandomAccessReadDevice to support AsyncReadStream.
template <typename AsyncRandomAccessReadDevice>
class basic_adapted_stream
: public AsyncRandomAccessReadDevice
{
public:
basic_adapted_stream(
boost::asio::io_service& io_service,
HANDLE handle
)
: AsyncRandomAccessReadDevice(io_service, handle),
offset_(0)
{}
template<typename MutableBufferSequence,
typename ReadHandler>
void async_read_some(
const MutableBufferSequence& buffers,
ReadHandler handler)
{
async_read_at(*this, offset_, buffers,
detail::read_some_offset_handler<ReadHandler>(handler, offset_));
}
private:
boost::uint64_t offset_;
};
Alternatively, boost::asio::windows::basic_stream_handle
can be provided a custom type meeting the requirements of StreamHandleService types, and implement async_read_some
in terms of async_read_some_at
.
/// @brief Service that implements async_read_some with async_read_some_at.
class offset_stream_handle_service
: public boost::asio::windows::stream_handle_service
{
private:
// The type of the platform-specific implementation.
typedef boost::asio::detail::win_iocp_handle_service service_impl_type;
public:
/// The unique service identifier.
static boost::asio::io_service::id id;
/// Construct a new stream handle service for the specified io_service.
explicit offset_stream_handle_service(boost::asio::io_service& io_service)
: boost::asio::windows::stream_handle_service(io_service),
service_impl_(io_service),
offset_(0)
{}
/// Start an asynchronous read.
template <typename MutableBufferSequence,
typename ReadHandler>
void
async_read_some(
implementation_type& impl,
const MutableBufferSequence& buffers,
ReadHandler handler)
{
// Implement async_read_some in terms of async_read_some_at. The provided
// ReadHandler will be hoisted in an internal handler so that offset_ can
// be properly updated.
service_impl_.async_read_some_at(impl, offset_, buffers,
detail::read_some_offset_handler<ReadHandler>(handler, offset_));
}
private:
// The platform-specific implementation.
service_impl_type service_impl_;
boost::uint64_t offset_;
};
boost::asio::io_service::id offset_stream_handle_service::id;
I have opted for simplicity in the example code, but the same service will be used by multiple I/O objects. Thus, the offset_stream_handle_service
would need to manage an offset per handler to function properly when multiple I/O objects use the service.
To use the adapted types, modify the AsyncReader::input_handle
member variable to be either a basic_adapted_stream<boost::asio::windows::random_access_handle>
(adapted I/O object) or boost::asio::windows::basic_stream_handle<offset_stream_handle_service>
(adapted service).
Example
Here is the complete example based on the original code, only modifying the AsyncReader::input_handler
's type:
#include "stdafx.h"
#include <cassert>
#include <iostream>
#include <string>
#include <boost/asio.hpp>
#include <boost/bind.hpp>
#include <Windows.h>
#include <WinBase.h>
namespace detail {
/// @brief Handler to wrap asynchronous read_some_at operations.
template <typename Handler>
class read_some_offset_handler
{
public:
read_some_offset_handler(Handler handler, boost::uint64_t& offset)
: handler_(handler),
offset_(offset)
{}
void operator()(
const boost::system::error_code& error,
std::size_t bytes_transferred)
{
offset_ += bytes_transferred;
// If bytes were transferred, then set the error code as success.
// EOF will be detected on next read. This is to account for
// the read_until algorithm behavior.
const boost::system::error_code result_ec =
(error && bytes_transferred)
? make_error_code(boost::system::errc::success) : error;
handler_(result_ec, bytes_transferred);
}
//private:
Handler handler_;
boost::uint64_t& offset_;
};
/// @brief Hook that allows the wrapped handler to be invoked
/// within specific context. This is critical to support
/// composed operations being invoked within a strand.
template <typename Function,
typename Handler>
void asio_handler_invoke(
Function function,
detail::read_some_offset_handler<Handler>* handler)
{
boost_asio_handler_invoke_helpers::invoke(
function, handler->handler_);
}
} // namespace detail
/// @brief Adapts AsyncRandomAccessReadDevice to support AsyncReadStream.
template <typename AsyncRandomAccessReadDevice>
class basic_adapted_stream
: public AsyncRandomAccessReadDevice
{
public:
basic_adapted_stream(
boost::asio::io_service& io_service,
HANDLE handle
)
: AsyncRandomAccessReadDevice(io_service, handle),
offset_(0)
{}
template<typename MutableBufferSequence,
typename ReadHandler>
void async_read_some(
const MutableBufferSequence& buffers,
ReadHandler handler)
{
async_read_at(*this, offset_, buffers,
detail::read_some_offset_handler<ReadHandler>(handler, offset_));
}
private:
boost::uint64_t offset_;
};
/// @brief Service that implements async_read_some with async_read_some_at.
class offset_stream_handle_service
: public boost::asio::windows::stream_handle_service
{
private:
// The type of the platform-specific implementation.
typedef boost::asio::detail::win_iocp_handle_service service_impl_type;
public:
/// The unique service identifier.
static boost::asio::io_service::id id;
/// Construct a new stream handle service for the specified io_service.
explicit offset_stream_handle_service(boost::asio::io_service& io_service)
: boost::asio::windows::stream_handle_service(io_service),
service_impl_(io_service),
offset_(0)
{}
/// Start an asynchronous read.
template <typename MutableBufferSequence,
typename ReadHandler>
void
async_read_some(
implementation_type& impl,
const MutableBufferSequence& buffers,
ReadHandler handler)
{
// Implement async_read_some in terms of async_read_some_at. The provided
// ReadHandler will be hoisted in an internal handler so that offset_ can
// be properly updated.
service_impl_.async_read_some_at(impl, offset_, buffers,
detail::read_some_offset_handler<ReadHandler>(handler, offset_));
}
private:
// The platform-specific implementation.
service_impl_type service_impl_;
boost::uint64_t offset_;
};
boost::asio::io_service::id offset_stream_handle_service::id;
#ifndef ADAPT_IO_SERVICE
typedef basic_adapted_stream<
boost::asio::windows::random_access_handle> adapted_stream;
#else
typedef boost::asio::windows::basic_stream_handle<
offset_stream_handle_service> adapted_stream;
#endif
class AsyncReader
{
public:
AsyncReader(boost::asio::io_service& io_service, HANDLE handle)
: io_service_(io_service),
input_buffer(/*size*/ 8192),
input_handle(io_service, handle)
{
start_read();
}
void start_read()
{
boost::asio::async_read_until(input_handle, input_buffer, '\n',
boost::bind(&AsyncReader::handle_read, this,
boost::asio::placeholders::error,
boost::asio::placeholders::bytes_transferred));
}
void handle_read(const boost::system::error_code& error, std::size_t length);
// void handle_write(const boost::system::error_code& error);
private:
boost::asio::io_service& io_service_;
boost::asio::streambuf input_buffer;
adapted_stream input_handle;
};
void AsyncReader::handle_read(const boost::system::error_code& error, std::size_t length)
{
if (!error)
{
static int count = 0;
++count;
// method 1: (same problem)
// const char* pStart = boost::asio::buffer_cast<const char*>(input_buffer.data());
// std::string s(pStart, length);
// input_buffer.consume(length);
// method 2:
std::istream is(&input_buffer);
std::string s;
assert(std::getline(is, s));
std::cout << "line #" << count << ", length " << length << ", getline() [" << s.size() << "] '" << s << "'\n";
start_read();
}
else if (error == boost::asio::error::not_found)
std::cerr << "Did not receive ending character!\n";
else
std::cerr << "Misc error during read!\n";
}
int _tmain(int argc, _TCHAR* argv[])
{
boost::asio::io_service io_service;
HANDLE handle = ::CreateFile(TEXT("c:/temp/input.txt"),
GENERIC_READ,
0, // share mode
NULL, // security attribute: NULL = default
OPEN_EXISTING, // creation disposition
FILE_FLAG_OVERLAPPED,
NULL // template file
);
AsyncReader obj(io_service, handle);
io_service.run();
std::cout << "Normal termination\n";
getchar();
return 0;
}
Which produces the following output when using the input from the original question:
line #1, length 70, getline() [69] 'LINE 1 abcdefghijklmnopqrstuvwxyzABCDEFGHIJKLMNOPQRSTUVWXYZ0123456789'
line #2, length 70, getline() [69] 'LINE 2 abcdefghijklmnopqrstuvwxyzABCDEFGHIJKLMNOPQRSTUVWXYZ0123456789'
line #3, length 70, getline() [69] 'LINE 3 abcdefghijklmnopqrstuvwxyzABCDEFGHIJKLMNOPQRSTUVWXYZ0123456789'
line #4, length 70, getline() [69] 'LINE 4 abcdefghijklmnopqrstuvwxyzABCDEFGHIJKLMNOPQRSTUVWXYZ0123456789'
line #5, length 70, getline() [69] 'LINE 5 abcdefghijklmnopqrstuvwxyzABCDEFGHIJKLMNOPQRSTUVWXYZ0123456789'
line #6, length 70, getline() [69] 'LINE 6 abcdefghijklmnopqrstuvwxyzABCDEFGHIJKLMNOPQRSTUVWXYZ0123456789'
line #7, length 70, getline() [69] 'LINE 7 abcdefghijklmnopqrstuvwxyzABCDEFGHIJKLMNOPQRSTUVWXYZ0123456789'
line #8, length 70, getline() [69] 'LINE 8 abcdefghijklmnopqrstuvwxyzABCDEFGHIJKLMNOPQRSTUVWXYZ0123456789'
line #9, length 70, getline() [69] 'LINE 9 abcdefghijklmnopqrstuvwxyzABCDEFGHIJKLMNOPQRSTUVWXYZ0123456789'
line #10, length 70, getline() [69] 'LINE 0 abcdefghijklmnopqrstuvwxyzABCDEFGHIJKLMNOPQRSTUVWXYZ0123456789'
line #11, length 70, getline() [69] 'LINE A abcdefghijklmnopqrstuvwxyzABCDEFGHIJKLMNOPQRSTUVWXYZ0123456789'
line #12, length 70, getline() [69] 'LINE B abcdefghijklmnopqrstuvwxyzABCDEFGHIJKLMNOPQRSTUVWXYZ0123456789'
line #13, length 70, getline() [69] 'LINE C abcdefghijklmnopqrstuvwxyzABCDEFGHIJKLMNOPQRSTUVWXYZ0123456789'
line #14, length 70, getline() [69] 'LINE D abcdefghijklmnopqrstuvwxyzABCDEFGHIJKLMNOPQRSTUVWXYZ0123456789'
line #15, length 70, getline() [69] 'LINE E abcdefghijklmnopqrstuvwxyzABCDEFGHIJKLMNOPQRSTUVWXYZ0123456789'
Misc error during read!
Normal termination
My input file did not have a \n
character at the end of LINE F. Thus, AsyncReader::handle_read()
gets invoked with an error of boost::asio::error::eof
and input_buffer
's contents contain LINE F. After modifying the final else case to print more information:
...
else
{
std::cerr << "Error: " << error.message() << "\n";
if (std::size_t buffer_size = input_buffer.size())
{
boost::asio::streambuf::const_buffers_type bufs = input_buffer.data();
std::string contents(boost::asio::buffers_begin(bufs),
boost::asio::buffers_begin(bufs) + buffer_size);
std::cerr << "stream contents: '" << contents << "'\n";
}
}
I get the following output:
line #1, length 70, getline() [69] 'LINE 1 abcdefghijklmnopqrstuvwxyzABCDEFGHIJKLMNOPQRSTUVWXYZ0123456789'
line #2, length 70, getline() [69] 'LINE 2 abcdefghijklmnopqrstuvwxyzABCDEFGHIJKLMNOPQRSTUVWXYZ0123456789'
line #3, length 70, getline() [69] 'LINE 3 abcdefghijklmnopqrstuvwxyzABCDEFGHIJKLMNOPQRSTUVWXYZ0123456789'
line #4, length 70, getline() [69] 'LINE 4 abcdefghijklmnopqrstuvwxyzABCDEFGHIJKLMNOPQRSTUVWXYZ0123456789'
line #5, length 70, getline() [69] 'LINE 5 abcdefghijklmnopqrstuvwxyzABCDEFGHIJKLMNOPQRSTUVWXYZ0123456789'
line #6, length 70, getline() [69] 'LINE 6 abcdefghijklmnopqrstuvwxyzABCDEFGHIJKLMNOPQRSTUVWXYZ0123456789'
line #7, length 70, getline() [69] 'LINE 7 abcdefghijklmnopqrstuvwxyzABCDEFGHIJKLMNOPQRSTUVWXYZ0123456789'
line #8, length 70, getline() [69] 'LINE 8 abcdefghijklmnopqrstuvwxyzABCDEFGHIJKLMNOPQRSTUVWXYZ0123456789'
line #9, length 70, getline() [69] 'LINE 9 abcdefghijklmnopqrstuvwxyzABCDEFGHIJKLMNOPQRSTUVWXYZ0123456789'
line #10, length 70, getline() [69] 'LINE 0 abcdefghijklmnopqrstuvwxyzABCDEFGHIJKLMNOPQRSTUVWXYZ0123456789'
line #11, length 70, getline() [69] 'LINE A abcdefghijklmnopqrstuvwxyzABCDEFGHIJKLMNOPQRSTUVWXYZ0123456789'
line #12, length 70, getline() [69] 'LINE B abcdefghijklmnopqrstuvwxyzABCDEFGHIJKLMNOPQRSTUVWXYZ0123456789'
line #13, length 70, getline() [69] 'LINE C abcdefghijklmnopqrstuvwxyzABCDEFGHIJKLMNOPQRSTUVWXYZ0123456789'
line #14, length 70, getline() [69] 'LINE D abcdefghijklmnopqrstuvwxyzABCDEFGHIJKLMNOPQRSTUVWXYZ0123456789'
line #15, length 70, getline() [69] 'LINE E abcdefghijklmnopqrstuvwxyzABCDEFGHIJKLMNOPQRSTUVWXYZ0123456789'
Error: End of file
stream contents: 'LINE F abcdefghijklmnopqrstuvwxyzABCDEFGHIJKLMNOPQRSTUVWXYZ0123456789'
Normal termination
One option is to fseek()
the file to the next position before the user's ReadHandler is called. Then async_read_some()
can be implemented as async_read_at(ftell())
.
The AsyncReader can use ReadUntilHandle instead of the stream_handle:
class ReadUntilHandle : public boost::asio::windows::random_access_handle
{
public:
ReadUntilHandle(boost::asio::io_service& ios, HANDLE handle)
: boost::asio::windows::random_access_handle(ios, handle)
{}
template <typename MutableBufferSequence, typename Handler>
void async_read_some(const MutableBufferSequence& buffers, Handler& handler)
{
LARGE_INTEGER offset;
offset.QuadPart = 0;
if (::SetFilePointerEx(native_handle(), offset, &offset, FILE_CURRENT)) {
async_read_some_at(offset.QuadPart, buffers,
std::bind(&on_read_complete<Handler>, handler,
native_handle(), std::ref(get_io_service()),
std::placeholders::_1, std::placeholders::_2));
} else {
boost::system::error_code error(::GetLastError(), boost::asio::error::get_system_category());
get_io_service().post(boost::asio::detail::bind_handler(handler, error, 0));
}
}
private:
template <typename Handler> static void
on_read_complete(Handler& handler, HANDLE native_handle, boost::asio::io_service& ios,
boost::system::error_code error, std::size_t length)
{
if (0 != length) { // update file position
LARGE_INTEGER offset;
offset.QuadPart = length;
if (!::SetFilePointerEx(native_handle, offset, NULL, FILE_CURRENT) && !error) {
error.assign(::GetLastError(), boost::asio::error::get_system_category());
}
}
ios.dispatch(boost::asio::detail::bind_handler(handler, error, length));
}
};