谷歌::protobuf + boost::asio 失败

4

我已经研究了以下现有的示例:

  1. 使用 boost::asio 发送 Protobuf 消息
  2. 使用 boost::asio::read_async 读取 Protobuf 对象
  3. Google Protocol Buffers: C++ 中的 parseDelimitedFrom 和 writeDelimitedTo
  4. Java 中 Protocol Buffers delimited I/O 函数在 C++ 中是否有相应的函数?
  5. 使用 boost::asio 发送 Protobuf 消息

但是我仍然无法弄清楚如何使用 Boost::asio API 传递 Google Protobuf 消息。特别是我对以下问题没有清晰的理解:

  1. boost::asio::streambuf 和 google::protobuf::io 对象之间的交互(以及应用后者的必要性)
  2. 消息流的正确实现(由于 C++ API 中缺乏 writeDelimitedTo 和 parseDelimitedFrom 方法)

这是我基于 Boost::asio v. 1.39 ssl_client 的实现,来自示例

    class client
{
public:
  client(boost::asio::io_service& io_service, boost::asio::ssl::context& context,
      boost::asio::ip::tcp::resolver::iterator endpoint_iterator)
    : socket_(io_service, context),
        request_stream(&b),
        raw_output(&request_stream),
        coded_output(&raw_output)
  {
    ... 
  }

  void handle_connect(const boost::system::error_code& error,
      boost::asio::ip::tcp::resolver::iterator endpoint_iterator)
  {
    ...
  }

  //Debugging function
  void print_buffers_condition(const char *step)
  {
      std::cout << "\nBuffer conditions after " << step << std::endl;
      std::cout << "boost::asio::streambuf\t\tb: " << b.size() << std::endl;
      std::cout << "google::protobuf::io::OstreamOutputStream raw_output: " << raw_output.ByteCount() << std::endl;
      std::cout << "google::protobuf::io::CodedOutputStream coded_output: " << coded_output.ByteCount() << std::endl;
      std::cout << std::endl;
  }

  //Sending test message after SSL Handshake
  void handle_handshake(const boost::system::error_code& error)
  {
      std::cout << "-----------------------------SENDING-----------------------------" << std::endl;
    print_buffers_condition("handle handshake");
    if (!error)
    {
        SearchRequest msg;
        msg.set_query("qwerty");
        msg.set_code(12345);

        std::cout << "Debugged" << std::endl;
        msg.PrintDebugString();


        //Writing the length of the message before and serializing                 
                    print_buffers_condition("before serialising");
        coded_output.WriteVarint32(msg.ByteSize());
        if (!msg.SerializeToCodedStream(&coded_output))
        {
            std::cout << "serailizing error" << std::endl;
        }
        else
        {
            std::cout << "serializing success" << std::endl;
        }

        //Sending
        buffers_condition("before async write");
        boost::asio::async_write(socket_,
                                 b,
                                 boost::bind(&client::handle_write, this, boost::asio::placeholders::error, boost::asio::placeholders::bytes_transferred));
        buffers_condition("after async write");
    }
    else
    {
      std::cout << "Handshake failed: " << error << "\n";
    }
  }

  void handle_write(const boost::system::error_code& error,
      size_t bytes_transferred)
  {
    std::cout << " bytes_trransferred: " << bytes_transferred << std::endl;
    if (!error)
    {
        std::cout << "No error" << std::endl;
        ...
    }
    else
    {
      std::cout << "Write failed: " << error << "\n";
    }
  }

  void handle_read(const boost::system::error_code& error,
      size_t bytes_transferred)
  {
    ...
  }

private:
  boost::asio::ssl::stream<boost::asio::ip::tcp::socket> socket_;
  boost::asio::streambuf b;
  std::ostream request_stream;
  google::protobuf::io::OstreamOutputStream raw_output;
  google::protobuf::io::CodedOutputStream coded_output;
};

这段代码是可行的,因此在创建消息后,我们进入了 void handle_write(const boost::system::error_code& error, size_t bytes_transferred) 函数。打印 bytes_transferred_ 值返回 0:服务器(也是基于 示例 实现)没有接收到任何内容。

使用调试函数 void print_buffers_condition(const char *step) 暗示在通过不同缓冲对象的堆栈传输消息时出现了丢失:

    $ ./client 127.0.0.1 5000
-----------------------------SENDING-----------------------------

Buffer conditions after handle handshake
boost::asio::streambuf      b: 0
google::protobuf::io::OstreamOutputStream raw_output: 8192
google::protobuf::io::CodedOutputStream coded_output: 0

Debugged: 
query: "qwerty"
code: 12345

Buffer conditions after before serialization
boost::asio::streambuf      b: 0
google::protobuf::io::OstreamOutputStream raw_output: 8192
google::protobuf::io::CodedOutputStream coded_output: 0

serializing success

Buffer conditions after before async write
boost::asio::streambuf      b: 0
google::protobuf::io::OstreamOutputStream raw_output: 8192
google::protobuf::io::CodedOutputStream coded_output: 13


Buffer conditions after after async write
boost::asio::streambuf      b: 0
google::protobuf::io::OstreamOutputStream raw_output: 8192
google::protobuf::io::CodedOutputStream coded_output: 13

 bytes_trransferred: 0

我不知道怎样以正确的方式做到这一点。

操作系统是RHEL 6.4。

谢谢。

1个回答

4
我不熟悉asio,但我认为问题在于您没有清空缓冲区。数据被卡在CodedOutputStream中,无法进入asio。
应该在堆栈上分配CodedOutputStream,这样一旦完成消息写入就会立即销毁。析构函数将刷新缓冲区。请注意,CodedOutputStream的分配成本很低,因此将其放在堆栈上没有性能问题(实际上,这可能更好)。
OstreamOutputStream也可以在堆栈上分配,但它会堆分配缓冲区,您可能希望重用它。如果选择重用相同的对象,请确保在销毁CodedOutputStream后调用Flush()以刷新缓冲区。
顺便说一句,OstreamOutputStream不是特别高效,因为它必须在ostream已经执行的基础上进行自己的缓冲层。您可能希望序列化为字符串(str = message.SerializeAsString()或message.SerializeToString(& str)),然后直接将其写入套接字(如果asio允许),因为它可能避免了冗余复制。

谢谢您的回复。说实话,我无法想象缓冲区的这种行为,所以我将尝试按照您提供的方式直接将它们放入函数中。不幸的是,我不知道如何同时使用 SerializeToString(...) 函数和包含消息长度的数字(在 Google 的术语中为 varint),因为后者也应该被序列化。这是在 Techniques 中规定的。 - Vitaly Isaev
2
啊,说得好。这有点复杂。基本上你要做的是先确定消息的大小,然后分配一个char数组进行序列化,然后创建一个CodedOutputStream,直接将其写入该数组而不是ZeroCopyOutputStream。CodedOutputStream有用于确定varints大小的静态方法,因此您需要将其添加到消息的“ByteSize()”中。这一切都变得有点丑陋,所以我只会在您认为性能很重要时才这样做。 - Kenton Varda

网页内容由stack overflow 提供, 点击上面的
可以查看英文原文,
原文链接