What is the best way to send many buffers with Boo

2019-07-23 13:38发布

问题:

What ist he best way to send many buffers with Boost::Asio method async_send_to? And this whole send procedure can be repeated at any time. And furthermore I want to determine the (correct) elapsed time of each send procedure.

I tried in this way:

//MainWindow.h

class MainWindow : public QMainWindow
{
    Q_OBJECT
public:
    explicit MainWindow(QWidget *parent = 0);
    ~MainWindow();
private slots:
    void on_connectPushButton_clicked();
    void on_asyncSendPushButton_clicked();
private:
    Ui::MainWindow *ui;
    QTime m_Timer;
    int m_BufferSize;
    int m_NumBuffersToSend;
    int m_TransferredBuffers;
    boost::asio::io_service m_IOService;
    std::unique_ptr<boost::asio::ip::udp::socket>     m_pSocket;
    boost::asio::ip::udp::endpoint m_ReceiverEndpoint;
    void handle_send(const boost::system::error_code& error, std::size_t size);
    void stopTimerAndLog();
};

//MainWindow.cpp

#include "MainWindow.h"
#include "ui_MainWindow.h"

//Some Qt includes

#include <boost/timer/timer.hpp>
#include <boost/array.hpp>
#include <boost/bind.hpp>

using boost::asio::ip::udp;

MainWindow::MainWindow(QWidget *parent) :
    m_BufferSize(0),
    m_NumBuffersToSend(0),
    m_TransferredBuffers(0),
    QMainWindow(parent),
    ui(new Ui::MainWindow)
{
    ui->setupUi(this);
}

MainWindow::~MainWindow()
{
    delete ui;
}


void MainWindow::on_connectPushButton_clicked()
{
    try
    {
        udp::resolver resolver(m_IOService);
        udp::resolver::query query(udp::v4(), ui->serverIpAddressLineEdit->text().toStdString(),
                ui->serverPortLineEdit->text().toStdString());
        m_ReceiverEndpoint = *resolver.resolve(query);
        m_pSocket = std::unique_ptr<boost::asio::ip::udp::socket>(new boost::asio::ip::udp::socket(m_IOService));
        m_pSocket->open(udp::v4());
    }
    catch (std::exception& e)
    {
        std::cerr << e.what() << std::endl;
    }
}

void MainWindow::stopTimerAndLog()
{
    int tmm = m_Timer.elapsed();
    double mBitPerSecond = 1000.0 * static_cast<double>(m_BufferSize * m_NumBuffersToSend)
            / ( 1024.0 * 1024.0 * tmm) * 8.0;
    LOG_INFO(__FUNCTION__ << ": " << QString("Buffer size: %1").arg(m_BufferSize).toStdString());
    LOG_INFO(__FUNCTION__ << ": " << QString("Num Buffers: %1").arg(m_NumBuffersToSend).toStdString());
    LOG_INFO(__FUNCTION__ << ": " << QString("Time: %1 ms").arg(tmm).toStdString());
    LOG_INFO(__FUNCTION__ << ": " << QString("%1 MBit/s").arg(mBitPerSecond).toStdString());
    ui->mBitperSecondDoubleSpinBox->setValue(mBitPerSecond);
}

void MainWindow::handle_send(const boost::system::error_code &error, size_t size)
{
    m_TransferredBuffers++;

    if (error)
    {
        //missing error propagation to main thread
        LOG_ERROR(__FUNCTION__ << ": ERROR: Client error while sending (error code = " << error << "): ");
        LOG_ERROR(__FUNCTION__ << ": ERROR: Recovering...");
    }

    if ( m_TransferredBuffers >= m_NumBuffersToSend )
    {
        stopTimerAndLog();
        m_IOService.stop();
    }
}

void MainWindow::on_asyncSendPushButton_clicked()
{
    try
    {
        m_BufferSize = ui->sendBufferSpinBox->value();
        char* data = new char[m_BufferSize];
        memset(data, 0, m_BufferSize);
        m_NumBuffersToSend = ui->numBufferSpinBox->value();

        m_Timer.start();

        for (int i=0; i < m_NumBuffersToSend; i++)
        {
            memset(data, i, m_BufferSize);

            m_pSocket->async_send_to(boost::asio::buffer(data, m_BufferSize),
                    m_ReceiverEndpoint,
                    boost::bind(&MainWindow::handle_send, this,
                        boost::asio::placeholders::error,
                        boost::asio::placeholders::bytes_transferred));
        }
        m_TransferredBuffers = 0;
        m_IOService.run();
        delete[] data;
    }
    catch (std::exception& e)
    {
        std::cerr << e.what() << std::endl;
    }
}

As you can see, the user can click on the connect button (on_connectPushButton_clicked). And then the send procedure starts by clicking on the async send button (on_asyncSendPushButton_clicked). And here I start the timer and call m_NumBuffersToSend times the async_send_to method. Then I run the IOService. For each async_send_to the handler handle_send will be called And the m_TransferredBuffers variable will be incremented until it reaches m_NumBuffersToSend. If this is the case, I stop the timer and the IOService.

But if I compare the time which was calculated in my program with the real sent udp’s with Wireshark there is always a big difference. How can I have a more accurate time calculation?

Is it possible to place the m_IOService.run(); call outside on_asyncSendPushButton_clicked?

回答1:

Well.

I'm not sure what you are observing. Here's the answer to

Q. Is it possible to place the m_IOService.run(); call outside on_asyncSendPushButton_clicked

Yes, you should use io_service::work to keep the IO service running. Here's a demo program:

Live On Coliru

  • I've created a single IO thread to serve the async operations/completion handlers
  • I've stripped the Qt dependency; demo Runs are configured randomly:

    struct Run {
        std::vector<char> buffer = std::vector<char>(rand()%800 + 200, '\0');
        int remainingToSend      = rand()%10 + 1;
        int transferredBuffers   = 0;
        Clock::time_point start  = Clock::now();
    
        void stopTimerAndLog() const;
    };
    
  • As a bonus, I added proper statistics using Boost Accumulators

  • Instead of doing (expensive) IO in stopTimerAndLog we add the samples to the accumulators:

    void stopTimerAndLog()
    {
        using namespace std::chrono;
    
        Clock::duration const elapsed = Clock::now() - start;
        int tmm = duration_cast<microseconds>(elapsed).count();
    
        double mBitPerSecond = tmm
            ? buffer.size() * transferredBuffers * 8.0 / 1024 / 1024 / (tmm / 1000000.0)
            : std::numeric_limits<double>::infinity();
    
        std::lock_guard<std::mutex> lk(demo_results::mx);
    
        demo_results::bufsize(buffer.size());
        demo_results::micros(tmm);
        if (tmm)
            demo_results::mbps(mBitPerSecond);
    }
    
  • You can run multiple demo Runs in overlap:

    Demo demo;
    demo.on_connect(argv[1], argv[2]);
    
    for (int i = 0; i<100; ++i) {
        std::this_thread::sleep_for(std::chrono::milliseconds(10));
        demo.on_async_testrun();
    }
    // Demo destructor joins IO thread, making sure all stats are final
    
  • The mutex guarding the statistics is redundant but GoodPractive(TM) since you may want to test with multiple IO threads

Output:

avg. Buffer size: 613.82, std.dev. 219.789
avg. b/w:         160.61 mbps, std.dev. 81.061
avg. time:        153.64 μs, std.dev. 39.0163

Full Listing

#include <boost/asio.hpp>
#include <boost/array.hpp>
#include <boost/make_shared.hpp>
#include <boost/bind.hpp>
#include <thread>
#include <mutex>
#include <chrono>
#include <memory>
#include <iostream>
#include <boost/accumulators/accumulators.hpp>
#include <boost/accumulators/statistics.hpp>

using boost::asio::ip::udp;
typedef std::chrono::high_resolution_clock Clock;

namespace demo_results {
    using namespace boost::accumulators;

    static std::mutex mx;
    accumulator_set<double, stats<tag::mean, tag::median, tag::variance> > bufsize, mbps, micros;
}

struct Run {
    std::vector<char> buffer = std::vector<char>(rand()%800 + 200, '\0');
    int remainingToSend      = rand()%10 + 1;
    int transferredBuffers   = 0;
    Clock::time_point start  = Clock::now();
    Clock::duration elapsed;

    void stopTimerAndLog()
    {
        using namespace std::chrono;

        Clock::duration const elapsed = Clock::now() - start;
        int tmm = duration_cast<microseconds>(elapsed).count();

        double mBitPerSecond = tmm
            ? buffer.size() * transferredBuffers * 8.0 / 1024 / 1024 / (tmm / 1000000.0)
            : std::numeric_limits<double>::infinity();

        std::lock_guard<std::mutex> lk(demo_results::mx);

        demo_results::bufsize(buffer.size());
        demo_results::micros(tmm);
        if (tmm)
            demo_results::mbps(mBitPerSecond);

#if 0
        std::cout << __FUNCTION__ << "  -----------------------------------------------\n";
        std::cout << __FUNCTION__ << ": " << "Buffer size: " << buffer.size()      << "\n";
        std::cout << __FUNCTION__ << ": " << "Num Buffers: " << transferredBuffers << "\n";
        std::cout << __FUNCTION__ << ": " << "Time: "        << tmm                << " μs\n";
        std::cout << __FUNCTION__ << ": " << mBitPerSecond   << " MBit/s\n";
#endif
    }

    typedef boost::shared_ptr<Run> Ptr;
};

struct Demo {
    boost::asio::io_service                        m_IOService;
    std::unique_ptr<boost::asio::io_service::work> m_work;
    std::unique_ptr<boost::asio::ip::udp::socket>  m_pSocket;
    boost::asio::ip::udp::endpoint                 m_ReceiverEndpoint;
    std::thread                                    m_io_thread;

    Demo() :
        m_IOService(),
        m_work(new boost::asio::io_service::work(m_IOService)),
        m_io_thread([this] { m_IOService.run(); })
    {
    }

    ~Demo() {
        m_work.reset();
        m_io_thread.join();
    }

    void on_connect(std::string const& host, std::string const& port)
    {
        try {
            udp::resolver resolver(m_IOService);
            m_ReceiverEndpoint = *resolver.resolve(udp::resolver::query(udp::v4(), host, port));
            m_pSocket = std::unique_ptr<boost::asio::ip::udp::socket>(new boost::asio::ip::udp::socket(m_IOService));
            m_pSocket->open(udp::v4());
        }
        catch (std::exception& e)
        {
            std::cerr << e.what() << std::endl;
        }
    }

    void perform_run(Run::Ptr state) {
        if (state->remainingToSend) {
            std::fill(state->buffer.begin(), state->buffer.end(), state->remainingToSend);

            m_pSocket->async_send_to(boost::asio::buffer(state->buffer),
                    m_ReceiverEndpoint,
                    boost::bind(&Demo::handle_sent, this,
                        boost::asio::placeholders::error,
                        boost::asio::placeholders::bytes_transferred,
                        state));
        } else {
            state->stopTimerAndLog();
        }
    }

    void handle_sent(boost::system::error_code const&error, size_t actually_transferred, Run::Ptr state)
    {
        assert(actually_transferred == state->buffer.size());
        state->transferredBuffers += 1;
        state->remainingToSend    -= 1;

        if (error) {
            // missing error propagation to main thread
            std::cerr << __FUNCTION__ << ": ERROR: Client error while sending (error code = " << error.message() << "): ";
            std::cerr << __FUNCTION__ << ": ERROR: Recovering...";
        }

        perform_run(state); // remaining buffers for run
    }

    void on_async_testrun() {
        perform_run(boost::make_shared<Run>());
    }
};

int main(int argc, char const** argv)
{
    assert(argc==3);

    {
        Demo demo;
        demo.on_connect(argv[1], argv[2]);

        for (int i = 0; i<100; ++i) {
            std::this_thread::sleep_for(std::chrono::milliseconds(10));
            demo.on_async_testrun();
        }
    } // Demo destructor joins IO thread, making sure all stats are final

    using namespace boost::accumulators;
    std::cout << "avg. Buffer size: " << mean(demo_results::bufsize) << ", std.dev. "      << sqrt(variance(demo_results::bufsize)) << "\n";
    std::cout << "avg. b/w:         " << mean(demo_results::mbps)    << " mbps, std.dev. " << sqrt(variance(demo_results::mbps))    << "\n";
    std::cout << "avg. time:        " << mean(demo_results::micros)  << " μs, std.dev. "   << sqrt(variance(demo_results::micros))  << "\n";
}


回答2:

Thank you very much for your answer. This was a very good starting point to improve my code.

I changed a little bit the way how to add the async_send_to methods.

void perform_run(Run::Ptr state) {

    for(decltype(state->buffersToSend) i = 0; i < state->buffersToSend; i++ )
    {
        std::fill(state->buffer.begin(), state->buffer.end(), i);

        m_pSocket->async_send_to(boost::asio::buffer(state->buffer),
                m_ReceiverEndpoint,
                boost::bind(&Demo::handle_sent, this,
                    boost::asio::placeholders::error,
                    boost::asio::placeholders::bytes_transferred,
                    state));
    }
}

void handle_sent(boost::system::error_code const&error, size_t actually_transferred, Run::Ptr state)
{
    assert(actually_transferred == state->buffer.size());
    state->transferredBuffers += 1;

    if (error) {
        // missing error propagation to main thread
        std::cerr << __FUNCTION__ << ": ERROR: Client error while sending (error code = " << error.message() << "): ";
        std::cerr << __FUNCTION__ << ": ERROR: Recovering...";
    }

    if (state->transferredBuffers >= state->buffersToSend ) {
                    state->stopTimerAndLog();
    }
}

And here is the full code in coliru

Greetings,

Thomas



标签: c++ boost