如何利用ZeroMQ编写自己的Protocol Buffers RPC实现

8
根据 Google Protocol Buffers 文档中 "定义服务" 部分的说明,他们说:

也可以使用您自己的 RPC 实现与协议缓冲区一起使用。

据我理解,Protocol Buffers 没有原生实现 RPC。相反,它们提供了一系列必须由用户(也就是我!)实现的抽象接口。因此,我想利用 ZeroMQ 实现这些抽象接口以进行网络通信。
我正试图创建一个使用 ZeroMQ 的 RPC 实现,因为我正在处理的项目已经使用 ZeroMQ 进行基本消息传递(这也是为什么我没有使用文档推荐的 gRPC 的原因)。
经过仔细阅读 Proto 文档后,我发现我需要为自己的实现实现抽象接口RpcChannelRpcController
下面是我目前在 RPC 实现方面所处位置的最小化示例。 .proto 文件: 出于简洁起见省略了 SearchRequest 和 SearchResponse 模式
service SearchService {
    rpc Search (SearchRequest) returns (SearchResponse);
}

SearchServiceImpl.h:

class SearchServiceImpl : public SearchService {
 public:
  void Search(google::protobuf::RpcController *controller,
                    const SearchRequest *request,
                    SearchResponse *response,
                    google::protobuf::Closure *done) override {
    // Static function that processes the request and gets the result
    SearchResponse res = GetSearchResult(request);

    // Call the callback function
    if (done != NULL) {
    done->Run();
    }
    }
  }
};

MyRPCController.h:

class MyRPCController : public google::protobuf::RpcController {
 public:
    MyRPCController();

    void Reset() override;

    bool Failed() const override;

    std::string ErrorText() const override;

    void StartCancel() override;

    void SetFailed(const std::string &reason) override;

    bool IsCanceled() const override;

    void NotifyOnCancel(google::protobuf::Closure *callback) override;
 private:
  bool failed_;
  std::string message_;
};

MyRPCController.cpp - Based off of this

void MyRPCController::Reset() {  failed_ = false; }

bool MyRPCController::Failed() const { return failed_; }

std::string MyRPCController::ErrorText() const { return message_; }

void MyRPCController::StartCancel() { }

void MyRPCController::SetFailed(const std::string &reason) {
  failed_ = true;
  message_ = reason;
}

bool MyRPCController::IsCanceled() const { return false; }

void MyRPCController::NotifyOnCancel(google::protobuf::Closure *callback) { }

MyRPCController::ChiRpcController() : RpcController() { Reset(); }

MyRpcChannel.h:

class MyRPCChannel: public google::protobuf::RpcChannel {
 public:
    void CallMethod(const google::protobuf::MethodDescriptor *method, google::protobuf::RpcController *controller,
                    const google::protobuf::Message *request, google::protobuf::Message *response,
                    google::protobuf::Closure *done) override;
};

我对现有的示例有一些疑问:

  • 我在哪里使用ZeroMQ呢?
    • 看起来应该放到RPCChannel中,因为在我看到的示例中(请参见此处的第3个代码块),它们传递一个字符串,其中包含绑定端口的端口(例如:MyRpcChannel channel("rpc:hostname:1234/myservice");)。
  • 我对我的RPCController实现感到担忧,它似乎过于简单了。这里应该再加一些东西吗?
  • 如何实现RPCChannel,它与SearchServiceImpl非常相似。这些类中的1个虚函数具有非常相似的方法签名,只是它是通用的。

以下是我遇到的一些其他Stack Overflow问题,它们提供了一些有关此主题的有用信息:

  1. Protobuf-Net:实现服务器、rpc控制器和rpc通道 - 这是我找到RPCController实现示例的地方。
  2. 使用Protocol Buffers实现ZeroMQ RPC - 这个答案很有趣,因为在顶部答案中,它似乎建议不使用Protobuf中的RPC格式化.proto文件。
    • 我还注意到在这个文件中,在名为libpbrpc 的存储库中,有同样的想法。
  3. 我是否应该使用现有的实现,例如RPCZ

感谢您的帮助。我希望提供了足够的信息,并且对我所寻求的内容清晰明确。如果有什么不清楚或缺少信息,请告诉我。我很乐意编辑问题。


1
这里也有同样的问题,你最终做了什么? - JoshGC
1个回答

4
  • ZeroMQ提供基于消息的网络通信低级API,可以包含任何数据。
  • ProtoBuffers是一个库,将结构化数据编码为压缩二进制数据并解码这些数据。
  • gRPC是一个RPC框架,为网络通信生成代码,具有使用ProtoBuffers数据交换数据的RPC服务函数。

ZeroMQ和gRPC都支持网络通信,但方式不同。您必须选择ZeroMQ或gRPC进行网络通信。 如果选择ZeroMQ,则可以使用ProtoBuffers对消息进行编码,交换二进制结构化数据。

主要问题在于ProtoBuffers库允许编码和解码变量记录(类似于C/C++联合),可以完全模拟具有函数交换ProtoBuffers消息的RPC服务提供的功能。

因此,选项如下:

  1. 使用发送和接收原语的ZeroMQ,并使用ProtoBuffers编码的变体消息来包含各种子消息,例如
union Request
{
  byte msgType;
  MessageType1 msg1;
  MessageType2 msg2;
  MessageType3 msg3;
}

union Response
{
  byte msgType;
  MessageType3 msg1;
  MessageType4 msg2;
  MessageType5 msg3;
}

send(Request request);
receive(Response response);
使用gRPC生成带有函数的服务,例如:
service MyService 
{
  rpc function1(MessageType1) returns (Response);
  rpc function2(MessageType2) returns (Response);
  rpc function3(MessageType3) returns (Response);

  rpc functionN(MessageType3) returns (MessageType5);
}

(这里可以使用许多组合)

  1. 只使用单个功能的gRPC服务,例如
service MyService 
{
    rpc function(Request) returns (Response);
}

选项可能取决于

  • 客户端的首选目标:基于ZeroMQ或gRPC的客户端
  • 比较ZeroMQ和基于gRPC的服务的性能原因
  • 特定功能,例如如何在ZeroMQ和基于gRPC的服务和客户端中使用/处理订阅(参见如何正确设计grpc中的发布-订阅模式?

对于第一个选项,与第二个选项相比,您需要做很多工作。您必须将发送的消息类型与预期接收的消息类型进行匹配。

如果有其他人将开发客户端,第二个选项将允许更容易/快速地理解提供的服务功能。

为了在ZeroMQ上开发RPC服务,我会定义一个.proto文件来指定函数、参数(所有可能的输入和输出参数)和错误,如下所示:

enum Function 
{
    F1 = 0;
    F2 = 1;
    F3 = 2;
}

enum Error 
{
    E1 = 0;
    E2 = 1;
    E3 = 2;
}

message Request
{ 
    required Function function = 1;
    repeated Input data = 2;
}

message Response
{ 
    required Function function = 1;
    required Error error = 2;
    repeated Output data = 3;
}

message Input
{ 
    optional Input1 data1 = 1;
    optional Input2 data2 = 2;
    ...
    optional InputN dataN = n;
}

message Output
{ 
    optional Output1 data1 = 1;
    optional Output2 data2 = 2;
    ...
    optional OutputN dataN = n;
}

message Message
{
   repeated Request requests;
   repeated Response responses;
}

根据函数ID,需要在运行时检查参数的数量和类型。


1
嗨Flaviu,感谢你抽出时间写下这些内容。这里有很多好的信息。我现在使用ZeroMQ和Protocol Buffers而不是gRPC有些困难,因为我们的项目已经使用了ZeroMQ,他们不想再实现另一个网络。因此,我一直在尝试探索如何正确地集成Protocol Buffers提供的RPC抽象接口,并如何使用ZeroMQ实现它们。 - Vpaladino
1
你好,欢迎。所以你想在 ZeroMQ 之上拥有一个 RPC (远程过程调用) 服务。为此,需要定义两个理论协议,一个用于指定 RPC 相关的内容(函数 ID、参数数量、可能发生的错误),另一个用于指定由 RPC 函数交换的消息(参数)。请查看 XML-RPC(https://en.wikipedia.org/wiki/XML-RPC)和 JSON-RPC(https://en.wikipedia.org/wiki/JSON-RPC)。基于这样的 RPC 定义,您可以编写涵盖 RPC 和消息内容的 .proto 文件 - 我想这是困难的部分。然后生成 ProtoBuffers 代码,将其集成到 ZeroMQ 项目中。 - Flaviu
1
感谢您的回复。据我所知,我认为我可以利用Protobufs来实现相同类型的功能,而不是使用Xml-RPC或Json RPC。通过阅读他们的“服务”文档,听起来似乎是这种情况,这也是我想要实现的。 - Vpaladino
1
或者需要定义两个理论协议,一个用于指定RPC内容(函数ID、参数数量、可能出现的错误),另一个用于指定由RPC函数交换的消息(参数)。从这里可以看出,这个问题可以使用Protobuf模式来解决,但我可能错了。再次感谢您抽出时间回复,我非常感激提供的信息。 - Vpaladino
1
很高兴听到这个消息。关于性能,使用函数的id(枚举)而不是字符串会更快。当需要与枚举相关联的文本时,我使用了https://github.com/cflaviu/decl_enum。此外,可以使用FlatBuffers(https://google.github.io/flatbuffers/)代替ProtoBuffers。它们与ProtoBufffers非常相似。FlatBuffers不压缩数据,因此数据发送/接收更快。 - Flaviu
显示剩余2条评论

网页内容由stack overflow 提供, 点击上面的
可以查看英文原文,
原文链接