Google::protobuf + boost::asio failure

2019-05-11 00:40发布

问题:

I have studied the existing examples:

  1. Sending Protobuf Messages with boost::asio
  2. Reading Protobuf objects using boost::asio::read_async
  3. Google Protocol Buffers: parseDelimitedFrom and writeDelimitedTo for C++
  4. Are there C++ equivalents for the Protocol Buffers delimited I/O functions in Java?
  5. Sending Protobuf Messages with boost::asio

but I still can not figure out how to pass Google Protobuf messages using the Boost::asio API. In particular I have no clear understanding of the following problems:

  1. Interaction between boost::asio::streambuf and google::protobuf::io objects (and the necessity of the applying of the last ones)
  2. Correct implementation of the message streaming (due to the lack of writeDelimitedTo and parseDelimitedFrom methods in C++ API)

Here is my implementation based on boost::asio v. 1.39 ssl_client from examples.

    class client
{
public:
  client(boost::asio::io_service& io_service, boost::asio::ssl::context& context,
      boost::asio::ip::tcp::resolver::iterator endpoint_iterator)
    : socket_(io_service, context),
        request_stream(&b),
        raw_output(&request_stream),
        coded_output(&raw_output)
  {
    ... 
  }

  void handle_connect(const boost::system::error_code& error,
      boost::asio::ip::tcp::resolver::iterator endpoint_iterator)
  {
    ...
  }

  //Debugging function
  void print_buffers_condition(const char *step)
  {
      std::cout << "\nBuffer conditions after " << step << std::endl;
      std::cout << "boost::asio::streambuf\t\tb: " << b.size() << std::endl;
      std::cout << "google::protobuf::io::OstreamOutputStream raw_output: " << raw_output.ByteCount() << std::endl;
      std::cout << "google::protobuf::io::CodedOutputStream coded_output: " << coded_output.ByteCount() << std::endl;
      std::cout << std::endl;
  }

  //Sending test message after SSL Handshake
  void handle_handshake(const boost::system::error_code& error)
  {
      std::cout << "-----------------------------SENDING-----------------------------" << std::endl;
    print_buffers_condition("handle handshake");
    if (!error)
    {
        SearchRequest msg;
        msg.set_query("qwerty");
        msg.set_code(12345);

        std::cout << "Debugged" << std::endl;
        msg.PrintDebugString();


        //Writing the length of the message before and serializing                 
                    print_buffers_condition("before serialising");
        coded_output.WriteVarint32(msg.ByteSize());
        if (!msg.SerializeToCodedStream(&coded_output))
        {
            std::cout << "serailizing error" << std::endl;
        }
        else
        {
            std::cout << "serializing success" << std::endl;
        }

        //Sending
        buffers_condition("before async write");
        boost::asio::async_write(socket_,
                                 b,
                                 boost::bind(&client::handle_write, this, boost::asio::placeholders::error, boost::asio::placeholders::bytes_transferred));
        buffers_condition("after async write");
    }
    else
    {
      std::cout << "Handshake failed: " << error << "\n";
    }
  }

  void handle_write(const boost::system::error_code& error,
      size_t bytes_transferred)
  {
    std::cout << " bytes_trransferred: " << bytes_transferred << std::endl;
    if (!error)
    {
        std::cout << "No error" << std::endl;
        ...
    }
    else
    {
      std::cout << "Write failed: " << error << "\n";
    }
  }

  void handle_read(const boost::system::error_code& error,
      size_t bytes_transferred)
  {
    ...
  }

private:
  boost::asio::ssl::stream<boost::asio::ip::tcp::socket> socket_;
  boost::asio::streambuf b;
  std::ostream request_stream;
  google::protobuf::io::OstreamOutputStream raw_output;
  google::protobuf::io::CodedOutputStream coded_output;
};

This code is operational, so after creating the message we fall into the void handle_write(const boost::system::error_code& error, size_t bytes_transferred) function. Printing the bytes_transferred_ value returns 0: server (implemented on the base of examples too) recieves nothing.

The usage of the debugging function void print_buffers_condition(const char *step) hints at loss of message during its transmission through a stack of different buffering objects:

    $ ./client 127.0.0.1 5000
-----------------------------SENDING-----------------------------

Buffer conditions after handle handshake
boost::asio::streambuf      b: 0
google::protobuf::io::OstreamOutputStream raw_output: 8192
google::protobuf::io::CodedOutputStream coded_output: 0

Debugged: 
query: "qwerty"
code: 12345

Buffer conditions after before serialization
boost::asio::streambuf      b: 0
google::protobuf::io::OstreamOutputStream raw_output: 8192
google::protobuf::io::CodedOutputStream coded_output: 0

serializing success

Buffer conditions after before async write
boost::asio::streambuf      b: 0
google::protobuf::io::OstreamOutputStream raw_output: 8192
google::protobuf::io::CodedOutputStream coded_output: 13


Buffer conditions after after async write
boost::asio::streambuf      b: 0
google::protobuf::io::OstreamOutputStream raw_output: 8192
google::protobuf::io::CodedOutputStream coded_output: 13

 bytes_trransferred: 0

I have no idea how to do it in a proper way. OS is RHEL 6.4. Thank you.

回答1:

I'm not familiar with asio, but it looks to me like the problem is that you aren't flushing your buffers. The data is stuck in CodedOutputStream and never finds its way into asio.

CodedOutputStream should be allocated on the stack, such that it is destroyed as soon as you're done writing the message. The destructor will flush the buffer. Note that CodedOutputStream is cheap to allocate so there's no performance problem with putting it on the stack (in fact, it's probably better that way).

OstreamOutputStream can similarly be allocated on the stack, but it heap-allocates a buffer which you might want to reuse. If you choose to reuse the same object, make sure to call Flush() to flush the buffer after the CodedOutputStream is destroyed.

Incidentally, OstreamOutputStream is not particularly efficient, because it has to do its own layer of buffering on top of what ostream is already doing. You may want to serialize to a string (str = message.SerializeAsString() or message.SerializeToString(&str)) and then write that directly to the socket (if asio allows this), as it will probably avoid a redundant copy.