编写一个简单的C++ protobuf流客户端/服务器

9

我想使用protobuf在客户端和服务器之间发送消息。在我的情况下,我想从服务器向客户端发送任意数量的protobuf消息。如何快速在C++中构建这个功能?

注意:我写了这个问题及其答案,参考了stackoverflow上Kenton Varda的回答(原文链接)和Fulkerson的回答(原文链接)。其他人也提出过类似的问题并遇到了类似的障碍-请参见这里, 这里, 和 这里

我对protobuf和asio都不熟悉,所以请随意纠正/建议改进,或提供您自己的答案。

2个回答

11
首先,C++ protobuf API缺乏内置支持在单个流/连接上发送多个protobuf消息的功能。Java API具备该功能,但尚未添加到C++版本中。protobuf v2的创建者Kenton Varda曾经友好地发布过C++版本。因此,您需要使用该代码来获得对单个连接的多个消息的支持。
然后,您可以使用boost::asio创建客户端/服务器。不要尝试使用asio提供的istream/ostream风格接口;尽管将其包装并创建protobuf所需的流类型(ZeroCopyInputStream/ZeroCopyOutputStream)更容易,但它不起作用。我并不完全理解其中的原因,但Fulkerson在这个答案中谈到了尝试这样做的脆弱性,并提供了将原始套接字适配为我们所需类型的示例代码。
通过将所有这些组合在一起以及基本的boost::asio教程,以下是客户端和服务器,接着是支持代码。我们正在发送一个名为MyMessage.pb.h中的简单protobuf类persistence::MyMessage的多个实例。请将其替换为您自己的内容。
#include <boost/asio.hpp>
#include "ProtobufHelpers.h"
#include "AsioAdapting.h"
#include "MyMessage.pb.h"
using boost::asio::ip::tcp;
int main()
{
    const char* hostname = "127.0.0.1";
    const char* port = "27015";
    boost::asio::io_service io_service;
    tcp::resolver resolver(io_service);
    tcp::resolver::query query(hostname, port);
    tcp::resolver::iterator endpoint_iterator = resolver.resolve(query);
    tcp::socket socket(io_service);
    boost::asio::connect(socket, endpoint_iterator);
    AsioInputStream<tcp::socket> ais(socket);
    CopyingInputStreamAdaptor cis_adp(&ais);
    for (;;)
    {
        persistence::MyMessage myMessage;
        google::protobuf::io::readDelimitedFrom(&cis_adp, &myMessage);
    }
    return 0;
}

服务器:

#include <boost/asio.hpp>
#include "ProtobufHelpers.h"
#include "AsioAdapting.h"
#include "MyMessage.pb.h"
using boost::asio::ip::tcp;
int main()
{
    boost::asio::io_service io_service;
    tcp::acceptor acceptor(io_service, tcp::endpoint(tcp::v4(), 27015));
    for (;;)
    {
        tcp::socket socket(io_service);
        acceptor.accept(socket);
        AsioOutputStream<boost::asio::ip::tcp::socket> aos(socket); // Where m_Socket is a instance of boost::asio::ip::tcp::socket
        CopyingOutputStreamAdaptor cos_adp(&aos);
        int i = 0;
        do {
            ++i;
            persistence::MyMessage myMessage;
            myMessage.set_myString("hello world");
            myMessage.set_myInt(i);
            google::protobuf::io::writeDelimitedTo(metricInfo, &cos_adp);
            // Now we have to flush, otherwise the write to the socket won't happen until enough bytes accumulate
            cos_adp.Flush(); 
        } while (true);
    }
    return 0;
}

以下是Kenton Varda提供的支持文件:
ProtobufHelpers.h
#pragma once
#include <google/protobuf/io/coded_stream.h>
#include <google/protobuf/io/zero_copy_stream.h>
#include <google/protobuf/message_lite.h>
namespace google {
    namespace protobuf {
        namespace io {
            bool writeDelimitedTo(
                const google::protobuf::MessageLite& message,
                google::protobuf::io::ZeroCopyOutputStream* rawOutput);

            bool readDelimitedFrom(
                google::protobuf::io::ZeroCopyInputStream* rawInput,
                google::protobuf::MessageLite* message);
        }
    }
}

and

ProtobufHelpers.cpp

#include "ProtobufHelpers.h"
namespace google {
    namespace protobuf {
        namespace io {
            bool writeDelimitedTo(
                const google::protobuf::MessageLite& message,
                google::protobuf::io::ZeroCopyOutputStream* rawOutput) {
                // We create a new coded stream for each message.  Don't worry, this is fast.
                google::protobuf::io::CodedOutputStream output(rawOutput);

                // Write the size.
                const int size = message.ByteSize();
                output.WriteVarint32(size);

                uint8_t* buffer = output.GetDirectBufferForNBytesAndAdvance(size);
                if (buffer != NULL) {
                    // Optimization:  The message fits in one buffer, so use the faster
                    // direct-to-array serialization path.
                    message.SerializeWithCachedSizesToArray(buffer);
                }
                else {
                    // Slightly-slower path when the message is multiple buffers.
                    message.SerializeWithCachedSizes(&output);
                    if (output.HadError()) return false;
                }

                return true;
            }

            bool readDelimitedFrom(
                google::protobuf::io::ZeroCopyInputStream* rawInput,
                google::protobuf::MessageLite* message) {
                // We create a new coded stream for each message.  Don't worry, this is fast,
                // and it makes sure the 64MB total size limit is imposed per-message rather
                // than on the whole stream.  (See the CodedInputStream interface for more
                // info on this limit.)
                google::protobuf::io::CodedInputStream input(rawInput);

                // Read the size.
                uint32_t size;
                if (!input.ReadVarint32(&size)) return false;

                // Tell the stream not to read beyond that size.
                google::protobuf::io::CodedInputStream::Limit limit =
                    input.PushLimit(size);

                // Parse the message.
                if (!message->MergeFromCodedStream(&input)) return false;
                if (!input.ConsumedEntireMessage()) return false;

                // Release the limit.
                input.PopLimit(limit);

                return true;
            }
        }
    }
}

感谢Fulkerson的帮助:

AsioAdapting.h

(Asio适配器)

#pragma once
#include <google/protobuf/io/zero_copy_stream_impl_lite.h>

using namespace google::protobuf::io;


template <typename SyncReadStream>
class AsioInputStream : public CopyingInputStream {
public:
    AsioInputStream(SyncReadStream& sock);
    int Read(void* buffer, int size);
private:
    SyncReadStream& m_Socket;
};


template <typename SyncReadStream>
AsioInputStream<SyncReadStream>::AsioInputStream(SyncReadStream& sock) :
    m_Socket(sock) {}


template <typename SyncReadStream>
int
AsioInputStream<SyncReadStream>::Read(void* buffer, int size)
{
    std::size_t bytes_read;
    boost::system::error_code ec;
    bytes_read = m_Socket.read_some(boost::asio::buffer(buffer, size), ec);

    if (!ec) {
        return bytes_read;
    }
    else if (ec == boost::asio::error::eof) {
        return 0;
    }
    else {
        return -1;
    }
}


template <typename SyncWriteStream>
class AsioOutputStream : public CopyingOutputStream {
public:
    AsioOutputStream(SyncWriteStream& sock);
    bool Write(const void* buffer, int size);
private:
    SyncWriteStream& m_Socket;
};


template <typename SyncWriteStream>
AsioOutputStream<SyncWriteStream>::AsioOutputStream(SyncWriteStream& sock) :
    m_Socket(sock) {}


template <typename SyncWriteStream>
bool
AsioOutputStream<SyncWriteStream>::Write(const void* buffer, int size)
{
    boost::system::error_code ec;
    m_Socket.write_some(boost::asio::buffer(buffer, size), ec);
    return !ec;
}

不错的写作 - 看起来有一个“delimited_message_util.h”,现在增加了“SerializeDelimitedToOstream”的功能。它基本上在每个消息之前放置消息的大小。 https://github.com/protocolbuffers/protobuf/blob/master/src/google/protobuf/util/delimited_message_util.h#L69 - GeminiDakota

6

我建议使用gRPC。它支持“流”请求,即客户端和服务器可以随时间发送多个消息以及单个逻辑请求的一部分,这应该适合您的需求。使用gRPC,很多琐碎的设置都已经为您处理好了,您有广泛的文档和教程可供参考,TLS加密已经内置,您还可以获得跨语言支持,轻松添加新种类的请求和并行流等。


你好,Kenton,需要你帮忙解决gRPC的问题。我想了解一下你所说的在任意方向上发送多个消息是指这些消息是串行发送还是通过使用线程生成的不同连接并行发送的? - Vinay Shukla
@VinayShukla 在 gRPC 文档中查找 "streaming"。抱歉,我没有更多信息,我自己也没有使用过它。 - Kenton Varda
感谢您的帮助,如果您能评论一下我的这个问题,我将非常感激。https://dev59.com/ClUL5IYBdhLWcg3wv6Vx - Vinay Shukla

网页内容由stack overflow 提供, 点击上面的
可以查看英文原文,
原文链接