如何在gRPC中正确地设计发布-订阅模式?

13

我尝试使用grpc实现发布-订阅模式,但是如何正确地实现有点困惑。

我的proto:rpc call (google.protobuf.Empty) returns (stream Data);

客户端:

asynStub.call(Empty.getDefaultInstance(), new StreamObserver<Data>() {
         @Override
         public void onNext(Data value) {
           // process a data

         @Override
         public void onError(Throwable t) {

         }

         @Override
         public void onCompleted() {

         }
       });

   } catch (StatusRuntimeException e) {
     LOG.warn("RPC failed: {}", e.getStatus());
   }

   Thread.currentThread().join();

服务器服务:

public class Sender extends DataServiceGrpc.DataServiceImplBase implements Runnable {
  private final BlockingQueue<Data> queue;
  private final static HashSet<StreamObserver<Data>> observers = new LinkedHashSet<>();

  public Sender(BlockingQueue<Data> queue) {
    this.queue = queue;
  }

  @Override
  public void data(Empty request, StreamObserver<Data> responseObserver) {
    observers.add(responseObserver);
  }

  @Override
  public void run() {
    while (!Thread.currentThread().isInterrupted()) {
      try {
        // waiting for first element
        Data data = queue.take();
        // send head element
        observers.forEach(o -> o.onNext(data));

      } catch (InterruptedException e) {
        LOG.error("error: ", e);
        Thread.currentThread().interrupt();
      }
    }
  }
}

如何正确地从全局观察者中移除客户端?当连接断开时,如何接收某种信号?
如何管理客户端-服务器的重新连接?当连接断开时如何强制客户端重新连接?

提前感谢!

1个回答

10
在您的服务实现中:
  @Override
  public void data(Empty request, StreamObserver<Data> responseObserver) {
    observers.add(responseObserver);
  }

你需要获取当前请求的 上下文,并 监听取消请求。对于单请求、多响应调用(也称为服务器流式传输),gRPC生成的代码简化了直接传递请求。这意味着你无法直接访问底层的 ServerCall.Listener,通常你会用它来监听客户端断开连接和取消请求。
相反,每个gRPC调用都有一个关联的Context,它携带了取消和其他与请求相关的信号。对于你的情况,你只需要添加自己的监听器来监听取消请求,然后安全地从你的链接哈希集合中删除响应观察者。
关于重新连接:当连接中断时,gRPC客户端将自动重新连接,但通常不会重试RPC,除非这样做是安全的。在服务器流式传输RPC的情况下,通常不安全,因此您需要直接在客户端上重试RPC。

Carl,谢谢!在grpc之上构建服务器流(发布/订阅)是一个好的实践吗? - Dmitry Zagorulkin
4
可以的。Google的Cloud Pubsub是建立在gRPC之上的,他们的客户端源代码已经公开在GitHub上了。你可以参考一下它们的实现方法。 - Carl Mastrangelo

网页内容由stack overflow 提供, 点击上面的
可以查看英文原文,
原文链接