What ist he best way to send many buffers with Boost::Asio method async_send_to
?
And this whole send procedure can be repeated at any time. And furthermore I want to determine the (correct) elapsed time of each send procedure.
I tried in this way:
//MainWindow.h
class MainWindow : public QMainWindow
{
Q_OBJECT
public:
explicit MainWindow(QWidget *parent = 0);
~MainWindow();
private slots:
void on_connectPushButton_clicked();
void on_asyncSendPushButton_clicked();
private:
Ui::MainWindow *ui;
QTime m_Timer;
int m_BufferSize;
int m_NumBuffersToSend;
int m_TransferredBuffers;
boost::asio::io_service m_IOService;
std::unique_ptr<boost::asio::ip::udp::socket> m_pSocket;
boost::asio::ip::udp::endpoint m_ReceiverEndpoint;
void handle_send(const boost::system::error_code& error, std::size_t size);
void stopTimerAndLog();
};
//MainWindow.cpp
#include "MainWindow.h"
#include "ui_MainWindow.h"
//Some Qt includes
#include <boost/timer/timer.hpp>
#include <boost/array.hpp>
#include <boost/bind.hpp>
using boost::asio::ip::udp;
MainWindow::MainWindow(QWidget *parent) :
m_BufferSize(0),
m_NumBuffersToSend(0),
m_TransferredBuffers(0),
QMainWindow(parent),
ui(new Ui::MainWindow)
{
ui->setupUi(this);
}
MainWindow::~MainWindow()
{
delete ui;
}
void MainWindow::on_connectPushButton_clicked()
{
try
{
udp::resolver resolver(m_IOService);
udp::resolver::query query(udp::v4(), ui->serverIpAddressLineEdit->text().toStdString(),
ui->serverPortLineEdit->text().toStdString());
m_ReceiverEndpoint = *resolver.resolve(query);
m_pSocket = std::unique_ptr<boost::asio::ip::udp::socket>(new boost::asio::ip::udp::socket(m_IOService));
m_pSocket->open(udp::v4());
}
catch (std::exception& e)
{
std::cerr << e.what() << std::endl;
}
}
void MainWindow::stopTimerAndLog()
{
int tmm = m_Timer.elapsed();
double mBitPerSecond = 1000.0 * static_cast<double>(m_BufferSize * m_NumBuffersToSend)
/ ( 1024.0 * 1024.0 * tmm) * 8.0;
LOG_INFO(__FUNCTION__ << ": " << QString("Buffer size: %1").arg(m_BufferSize).toStdString());
LOG_INFO(__FUNCTION__ << ": " << QString("Num Buffers: %1").arg(m_NumBuffersToSend).toStdString());
LOG_INFO(__FUNCTION__ << ": " << QString("Time: %1 ms").arg(tmm).toStdString());
LOG_INFO(__FUNCTION__ << ": " << QString("%1 MBit/s").arg(mBitPerSecond).toStdString());
ui->mBitperSecondDoubleSpinBox->setValue(mBitPerSecond);
}
void MainWindow::handle_send(const boost::system::error_code &error, size_t size)
{
m_TransferredBuffers++;
if (error)
{
//missing error propagation to main thread
LOG_ERROR(__FUNCTION__ << ": ERROR: Client error while sending (error code = " << error << "): ");
LOG_ERROR(__FUNCTION__ << ": ERROR: Recovering...");
}
if ( m_TransferredBuffers >= m_NumBuffersToSend )
{
stopTimerAndLog();
m_IOService.stop();
}
}
void MainWindow::on_asyncSendPushButton_clicked()
{
try
{
m_BufferSize = ui->sendBufferSpinBox->value();
char* data = new char[m_BufferSize];
memset(data, 0, m_BufferSize);
m_NumBuffersToSend = ui->numBufferSpinBox->value();
m_Timer.start();
for (int i=0; i < m_NumBuffersToSend; i++)
{
memset(data, i, m_BufferSize);
m_pSocket->async_send_to(boost::asio::buffer(data, m_BufferSize),
m_ReceiverEndpoint,
boost::bind(&MainWindow::handle_send, this,
boost::asio::placeholders::error,
boost::asio::placeholders::bytes_transferred));
}
m_TransferredBuffers = 0;
m_IOService.run();
delete[] data;
}
catch (std::exception& e)
{
std::cerr << e.what() << std::endl;
}
}
As you can see, the user can click on the connect button (on_connectPushButton_clicked
). And then the send procedure starts by clicking on the async send button (on_asyncSendPushButton_clicked
). And here I start the timer and call m_NumBuffersToSend
times the async_send_to method. Then I run the IOService
. For each async_send_to
the handler handle_send
will be called And the m_TransferredBuffers
variable will be incremented until it reaches m_NumBuffersToSend
. If this is the case, I stop the timer and the IOService
.
But if I compare the time which was calculated in my program with the real sent udp’s with Wireshark there is always a big difference. How can I have a more accurate time calculation?
Is it possible to place the m_IOService.run();
call outside on_asyncSendPushButton_clicked
?
Well.
I'm not sure what you are observing. Here's the answer to
Q. Is it possible to place the m_IOService.run(); call outside on_asyncSendPushButton_clicked
Yes, you should use io_service::work
to keep the IO service running. Here's a demo program:
Live On Coliru
- I've created a single IO thread to serve the async operations/completion handlers
I've stripped the Qt dependency; demo Run
s are configured randomly:
struct Run {
std::vector<char> buffer = std::vector<char>(rand()%800 + 200, '\0');
int remainingToSend = rand()%10 + 1;
int transferredBuffers = 0;
Clock::time_point start = Clock::now();
void stopTimerAndLog() const;
};
As a bonus, I added proper statistics using Boost Accumulators
Instead of doing (expensive) IO in stopTimerAndLog
we add the samples to the accumulators:
void stopTimerAndLog()
{
using namespace std::chrono;
Clock::duration const elapsed = Clock::now() - start;
int tmm = duration_cast<microseconds>(elapsed).count();
double mBitPerSecond = tmm
? buffer.size() * transferredBuffers * 8.0 / 1024 / 1024 / (tmm / 1000000.0)
: std::numeric_limits<double>::infinity();
std::lock_guard<std::mutex> lk(demo_results::mx);
demo_results::bufsize(buffer.size());
demo_results::micros(tmm);
if (tmm)
demo_results::mbps(mBitPerSecond);
}
You can run multiple demo Runs in overlap:
Demo demo;
demo.on_connect(argv[1], argv[2]);
for (int i = 0; i<100; ++i) {
std::this_thread::sleep_for(std::chrono::milliseconds(10));
demo.on_async_testrun();
}
// Demo destructor joins IO thread, making sure all stats are final
The mutex
guarding the statistics is redundant but GoodPractive(TM) since you may want to test with multiple IO threads
Output:
avg. Buffer size: 613.82, std.dev. 219.789
avg. b/w: 160.61 mbps, std.dev. 81.061
avg. time: 153.64 μs, std.dev. 39.0163
Full Listing
#include <boost/asio.hpp>
#include <boost/array.hpp>
#include <boost/make_shared.hpp>
#include <boost/bind.hpp>
#include <thread>
#include <mutex>
#include <chrono>
#include <memory>
#include <iostream>
#include <boost/accumulators/accumulators.hpp>
#include <boost/accumulators/statistics.hpp>
using boost::asio::ip::udp;
typedef std::chrono::high_resolution_clock Clock;
namespace demo_results {
using namespace boost::accumulators;
static std::mutex mx;
accumulator_set<double, stats<tag::mean, tag::median, tag::variance> > bufsize, mbps, micros;
}
struct Run {
std::vector<char> buffer = std::vector<char>(rand()%800 + 200, '\0');
int remainingToSend = rand()%10 + 1;
int transferredBuffers = 0;
Clock::time_point start = Clock::now();
Clock::duration elapsed;
void stopTimerAndLog()
{
using namespace std::chrono;
Clock::duration const elapsed = Clock::now() - start;
int tmm = duration_cast<microseconds>(elapsed).count();
double mBitPerSecond = tmm
? buffer.size() * transferredBuffers * 8.0 / 1024 / 1024 / (tmm / 1000000.0)
: std::numeric_limits<double>::infinity();
std::lock_guard<std::mutex> lk(demo_results::mx);
demo_results::bufsize(buffer.size());
demo_results::micros(tmm);
if (tmm)
demo_results::mbps(mBitPerSecond);
#if 0
std::cout << __FUNCTION__ << " -----------------------------------------------\n";
std::cout << __FUNCTION__ << ": " << "Buffer size: " << buffer.size() << "\n";
std::cout << __FUNCTION__ << ": " << "Num Buffers: " << transferredBuffers << "\n";
std::cout << __FUNCTION__ << ": " << "Time: " << tmm << " μs\n";
std::cout << __FUNCTION__ << ": " << mBitPerSecond << " MBit/s\n";
#endif
}
typedef boost::shared_ptr<Run> Ptr;
};
struct Demo {
boost::asio::io_service m_IOService;
std::unique_ptr<boost::asio::io_service::work> m_work;
std::unique_ptr<boost::asio::ip::udp::socket> m_pSocket;
boost::asio::ip::udp::endpoint m_ReceiverEndpoint;
std::thread m_io_thread;
Demo() :
m_IOService(),
m_work(new boost::asio::io_service::work(m_IOService)),
m_io_thread([this] { m_IOService.run(); })
{
}
~Demo() {
m_work.reset();
m_io_thread.join();
}
void on_connect(std::string const& host, std::string const& port)
{
try {
udp::resolver resolver(m_IOService);
m_ReceiverEndpoint = *resolver.resolve(udp::resolver::query(udp::v4(), host, port));
m_pSocket = std::unique_ptr<boost::asio::ip::udp::socket>(new boost::asio::ip::udp::socket(m_IOService));
m_pSocket->open(udp::v4());
}
catch (std::exception& e)
{
std::cerr << e.what() << std::endl;
}
}
void perform_run(Run::Ptr state) {
if (state->remainingToSend) {
std::fill(state->buffer.begin(), state->buffer.end(), state->remainingToSend);
m_pSocket->async_send_to(boost::asio::buffer(state->buffer),
m_ReceiverEndpoint,
boost::bind(&Demo::handle_sent, this,
boost::asio::placeholders::error,
boost::asio::placeholders::bytes_transferred,
state));
} else {
state->stopTimerAndLog();
}
}
void handle_sent(boost::system::error_code const&error, size_t actually_transferred, Run::Ptr state)
{
assert(actually_transferred == state->buffer.size());
state->transferredBuffers += 1;
state->remainingToSend -= 1;
if (error) {
// missing error propagation to main thread
std::cerr << __FUNCTION__ << ": ERROR: Client error while sending (error code = " << error.message() << "): ";
std::cerr << __FUNCTION__ << ": ERROR: Recovering...";
}
perform_run(state); // remaining buffers for run
}
void on_async_testrun() {
perform_run(boost::make_shared<Run>());
}
};
int main(int argc, char const** argv)
{
assert(argc==3);
{
Demo demo;
demo.on_connect(argv[1], argv[2]);
for (int i = 0; i<100; ++i) {
std::this_thread::sleep_for(std::chrono::milliseconds(10));
demo.on_async_testrun();
}
} // Demo destructor joins IO thread, making sure all stats are final
using namespace boost::accumulators;
std::cout << "avg. Buffer size: " << mean(demo_results::bufsize) << ", std.dev. " << sqrt(variance(demo_results::bufsize)) << "\n";
std::cout << "avg. b/w: " << mean(demo_results::mbps) << " mbps, std.dev. " << sqrt(variance(demo_results::mbps)) << "\n";
std::cout << "avg. time: " << mean(demo_results::micros) << " μs, std.dev. " << sqrt(variance(demo_results::micros)) << "\n";
}
Thank you very much for your answer. This was a very good starting point to improve my code.
I changed a little bit the way how to add the async_send_to methods.
void perform_run(Run::Ptr state) {
for(decltype(state->buffersToSend) i = 0; i < state->buffersToSend; i++ )
{
std::fill(state->buffer.begin(), state->buffer.end(), i);
m_pSocket->async_send_to(boost::asio::buffer(state->buffer),
m_ReceiverEndpoint,
boost::bind(&Demo::handle_sent, this,
boost::asio::placeholders::error,
boost::asio::placeholders::bytes_transferred,
state));
}
}
void handle_sent(boost::system::error_code const&error, size_t actually_transferred, Run::Ptr state)
{
assert(actually_transferred == state->buffer.size());
state->transferredBuffers += 1;
if (error) {
// missing error propagation to main thread
std::cerr << __FUNCTION__ << ": ERROR: Client error while sending (error code = " << error.message() << "): ";
std::cerr << __FUNCTION__ << ": ERROR: Recovering...";
}
if (state->transferredBuffers >= state->buffersToSend ) {
state->stopTimerAndLog();
}
}
And here is the full code in coliru
Greetings,
Thomas