io_context.run() in a separate thread blocks

2020-05-09 11:49发布

问题:

I have a RESTServer.hpp implemented using boost.beast as shown below.

#pragma once

#include <boost/property_tree/json_parser.hpp>
#include <boost/beast/core.hpp>
#include <boost/beast/http.hpp>
#include <boost/beast/version.hpp>
#include <boost/asio.hpp>
#include <chrono>
#include <cstdlib>
#include <ctime>
#include <iostream>
#include <memory>
#include <string>

namespace beast = boost::beast;
namespace http = beast::http;
namespace net = boost::asio;
using tcp = boost::asio::ip::tcp;

class RESTServer : public std::enable_shared_from_this<RESTServer> {
public:
    RESTServer(tcp::socket socket)
        : m_socket(std::move(socket)) {
    }

    void start() {
        readRequest();
        checkDeadline();
    }

private:
    tcp::socket m_socket;
    beast::flat_buffer m_buffer{8192};
    http::request<http::dynamic_body> m_request;
    http::response<http::dynamic_body> m_response;

    net::steady_timer m_deadline{m_socket.get_executor(), std::chrono::seconds(60)};

    void readRequest() {
        auto self = shared_from_this();
        http::async_read(m_socket, m_buffer, m_request, [self](beast::error_code ec, std::size_t bytes_transferred) {
            boost::ignore_unused(bytes_transferred);
            if (!ec) {
                self->processRequest();
            }
        });
    }

    void processRequest() {
        m_response.version(m_request.version());
        m_response.keep_alive(false);

        switch (m_request.method()) {
            case http::verb::get:
                m_response.result(http::status::ok);
                m_response.set(http::field::server, "Beast");
                createResponse();
                break;

            case http::verb::post:
                m_response.result(http::status::ok);
                m_response.set(http::field::server, "Beast");
                createResponse();
                break;

            default:
                m_response.result(http::status::bad_request);
                m_response.set(http::field::content_type, "text/plain");
                beast::ostream(m_response.body())
                    << "Invalid request-method '"
                    << std::string(m_request.method_string())
                    << "'";
                break;
        }
        writeResponse();
    }

    void createResponse() {
        if(request_.target() == "/count")
        {
            response_.set(http::field::content_type, "text/html");
            beast::ostream(response_.body())
                << "<html>\n"
                <<  "<head><title>Request count</title></head>\n"
                <<  "<body>\n"
                <<  "<h1>Request count</h1>\n"
                <<  "<p>There have been "
                <<  my_program_state::request_count()
                <<  " requests so far.</p>\n"
                <<  "</body>\n"
                <<  "</html>\n";
        }
        else if(request_.target() == "/time")
        {
            response_.set(http::field::content_type, "text/html");
            beast::ostream(response_.body())
                <<  "<html>\n"
                <<  "<head><title>Current time</title></head>\n"
                <<  "<body>\n"
                <<  "<h1>Current time</h1>\n"
                <<  "<p>The current time is "
                <<  my_program_state::now()
                <<  " seconds since the epoch.</p>\n"
                <<  "</body>\n"
                <<  "</html>\n";
        }
        else
        {
            response_.result(http::status::not_found);
            response_.set(http::field::content_type, "text/plain");
            beast::ostream(response_.body()) << "File not found\r\n";
        }
    }

    void writeResponse() {
        auto self = shared_from_this();

        m_response.set(http::field::content_length, m_response.body().size());

        http::async_write(m_socket, m_response,
                          [self](beast::error_code ec, std::size_t) {
                              self->m_socket.shutdown(tcp::socket::shutdown_send, ec);
                              self->m_deadline.cancel();
                          });
    }

    void checkDeadline() {
        auto self = shared_from_this();

        m_deadline.async_wait([self](beast::error_code ec) {
            if (!ec) {
                self->m_socket.close(ec);
            }
        });
    }
};

void httpServer(tcp::acceptor& acceptor, tcp::socket& socket) {
    acceptor.async_accept(socket, [&](beast::error_code ec) {
        if (!ec) {
            std::make_shared<RESTServer>(std::move(socket))->start();
        }
        httpServer(acceptor, socket);
    });
}

I also have a RESTClient RESTClient.hpp and RESTClient.cpp as shown below.

RESTClient.hpp

#pragma once

#include <boost/beast/core.hpp>
#include <boost/beast/http.hpp>
#include <boost/beast/version.hpp>
#include <boost/asio/strand.hpp>
#include <boost/property_tree/json_parser.hpp>
#include <boost/property_tree/ptree.hpp>

namespace beast = boost::beast;
namespace http = beast::http;
namespace net = boost::asio;
using tcp = boost::asio::ip::tcp;


// Performs an HTTP GET and prints the response
class RESTClient : public std::enable_shared_from_this<RESTClient> {

public:
    explicit RESTClient(net::io_context& ioc);

    virtual ~RESTClient();

    virtual void run(char const* host, char const* port, char const* target, int version);

    virtual void onResolve(beast::error_code ec, tcp::resolver::results_type results);

    virtual void onConnect(beast::error_code ec, tcp::resolver::results_type::endpoint_type);

    virtual void onWrite(beast::error_code ec, std::size_t bytes_transferred);

    virtual void onRead(beast::error_code ec, std::size_t bytes_transferred);

private:
    void createGetRequest(char const* host, char const* target, int version);

    void createPostRequest(char const* host, char const* target, int version, char const *body);

    std::string createBody();

    tcp::resolver m_resolver;
    beast::tcp_stream m_stream;
    beast::flat_buffer m_buffer; // (Must persist between reads)
    http::request<http::string_body> m_httpRequest;
    http::response<http::string_body> m_httpResponse;
};

RESTClient.cpp

#include <boost/beast/core.hpp>
#include <boost/beast/http.hpp>
#include <boost/beast/version.hpp>
#include <boost/asio/strand.hpp>
#include <boost/lexical_cast.hpp>
#include <cstdlib>
#include <iostream>
#include <memory>
#include <string>
#include "RESTClient.hpp"

namespace beast = boost::beast;
namespace http = beast::http;
namespace net = boost::asio;
using tcp = boost::asio::ip::tcp;

void fail(beast::error_code ec, char const* what) {
    std::cerr << what << ": " << ec.message() << "\n";
}


RESTClient::RESTClient(net::io_context& ioc)
    : m_resolver(net::make_strand(ioc)), m_stream(net::make_strand(ioc)) {

}

RESTClient::~RESTClient() = default;


void RESTClient::run(char const* host, char const* port, char const* target, int version) {

    createPostRequest(host, target, version, createBody().c_str());

    m_resolver.async_resolve(host, port, beast::bind_front_handler(
        &RESTClient::onResolve,
        shared_from_this()));
}

void RESTClient::onResolve(beast::error_code ec, tcp::resolver::results_type results) {
    if (ec) {
        return fail(ec, "resolve");
    }

    std::cout << "onResolve ******" << std::endl;
    m_stream.expires_after(std::chrono::seconds(30));

    m_stream.async_connect(results, beast::bind_front_handler(
        &RESTClient::onConnect,
        shared_from_this()));
}

void
RESTClient::onConnect(beast::error_code ec, tcp::resolver::results_type::endpoint_type) {
    if (ec) {
        return fail(ec, "connect");
    }

    std::cout << "onConnect ******" << std::endl;

    m_stream.expires_after(std::chrono::seconds(30));

    http::async_write(m_stream, m_httpRequest,
                      beast::bind_front_handler(
                          &RESTClient::onWrite,
                          shared_from_this()));
}

void
RESTClient::onWrite(beast::error_code ec, std::size_t bytes_transferred) {
    boost::ignore_unused(bytes_transferred);

    if (ec) {
        return fail(ec, "write");
    }

    std::cout << "onWrite ******" << std::endl;

    http::async_read(m_stream, m_buffer, m_httpResponse, beast::bind_front_handler(
        &RESTClient::onRead,
        shared_from_this()));
}

void RESTClient::onRead(beast::error_code ec, std::size_t bytes_transferred) {

    boost::ignore_unused(bytes_transferred);

    if (ec) {
        return fail(ec, "read");
    }

    std::cout << "onRead ******" << std::endl;

    std::cout << m_httpResponse << std::endl;


    m_stream.socket().shutdown(tcp::socket::shutdown_both, ec);

    if (ec && ec != beast::errc::not_connected) {
        return fail(ec, "shutdown");
    }
}

void RESTClient::createGetRequest(char const* host, char const* target, int version) {
    m_httpRequest.version(version);
    m_httpRequest.method(http::verb::get);
    m_httpRequest.target(target);
    m_httpRequest.set(http::field::host, host);
    m_httpRequest.set(http::field::user_agent, BOOST_BEAST_VERSION_STRING);
}


void RESTClient::createPostRequest(char const* host, char const* target, int version, char const* body) {
    m_httpRequest.version(version);
    m_httpRequest.method(http::verb::post);
    m_httpRequest.target(target);
    m_httpRequest.set(http::field::host, host);
    m_httpRequest.set(http::field::user_agent, BOOST_BEAST_VERSION_STRING);
    m_httpRequest.set(http::field::content_length, boost::lexical_cast<std::string>(strlen(body)));
    m_httpRequest.set(http::field::body, body);
    m_httpRequest.prepare_payload();
}


std::string RESTClient::createBody() {
    boost::property_tree::ptree tree;
    boost::property_tree::read_json("test.json",tree);
    std::basic_stringstream<char> jsonStream;
    boost::property_tree::json_parser::write_json(jsonStream, tree, false);
    std::cout << "json stream :" << jsonStream.str() << std::endl;
    return jsonStream.str();
}


int main(int argc, char** argv) {
    // Check command line arguments.
    if (argc != 4 && argc != 5) {
        std::cerr <<
                  "Usage: http-client-async <host> <port> <target> [<HTTP version: 1.0 or 1.1(default)>]\n" <<
                  "Example:\n" <<
                  "    http-client-async www.example.com 80 /\n" <<
                  "    http-client-async www.example.com 80 / 1.0\n";
        return EXIT_FAILURE;
    }
    auto const host = argv[1];
    auto const port = argv[2];
    auto const target = argv[3];
    int version = argc == 5 && !std::strcmp("1.0", argv[4]) ? 10 : 11;

    // The io_context is required for all I/O
    net::io_context ioc;
    std::cout << "version: " << version << std::endl;

    // Launch the asynchronous operation
    std::make_shared<RESTClient>(ioc)->run(host, port, target, version);

    // Run the I/O service. The call will return when
    // the get operation is complete.
    ioc.run();

    return EXIT_SUCCESS;
}

Now I want to test my RESTClient using googletest. In the unit test, I want to use the RESTServer to simulate the response to the client. My test class is shown below.

class MyTest : public ::testing::Test{
    virtual void SetUp(){
         httpServer(m_acceptor, m_socket);
         m_threads.emplace_back(boost::bind(&boost::asio::io_context::run, &m_ioc));
         m_ioc.run();
    }

    virtual void TearDown() {
    for(auto& thread : m_threads) {
        if(thread.joinable()) {
            thread.join();
        }
    }


    net::ip::address m_address = net::ip::make_address("0.0.0.0");
    unsigned short m_port = static_cast<unsigned short>(8080);
    net::io_context m_ioc{1};

    tcp::acceptor m_acceptor{m_ioc, {m_address, m_port}};
    tcp::socket m_socket{m_ioc};
    std::vector<std::thread> m_threads;

};

My question is the following.

When I implement the class MyTest, I start the RESTServer in a separate thread. Please see the code in SetUp(). When I call m_ioc.run(), the server runs, but blocks. I would expect the server to run in a separate thread, and proceed to execute my test case, where I start the client and do some GET and POST operations.

回答1:

You're calling run in both the thread

m_threads.emplace_back(boost::bind(&boost::asio::io_context::run, &m_ioc));

and the google test thread

m_ioc.run();

which is causing the Setup to block. Try removing m_ioc.run(); since you are already spawning a thread to call io_context::run.