在ServerInterceptor中使用grpc-java异步调用

7

我有一个grpc-java服务器,需要在处理请求之前向身份验证服务发出异步调用。我认为这应该在拦截器中完成,但它要求从interceptCall()同步返回Listener。

class AuthInterceptor implements ServerInterceptor {

    @Override
    public <ReqT, RespT> ServerCall.Listener<ReqT> interceptCall(
        ServerCall<ReqT, RespT> call,
        Metadata headers,
        ServerCallHandler<ReqT, RespT> next
    ) {
        String token = ""; //get token from headers
        authService.authorize(token).subscribe(
            ok -> // process the request
            error -> call.close(Status.UNAUTHENTICATED, headers)
        );
        // Here we need to return a Listener, but we haven't started a call yet
    }
}

所以问题是:如何从ServerInterceptor发起异步调用?如果无法实现,那么在grpc中异步验证请求的正确方式是什么?我知道可以直接在grpc服务中使用StreamObservers完成,但请求授权是跨越多个服务的通用问题,拦截器似乎是最佳选择。

2
你能否更新你的问题,展示一下你使用DelayedListener后得到了什么结果吗?我不太清楚如何将它们连接起来。 - Abhijit Sarkar
1个回答

5
你需要返回一个ServerCall.Listener。但由于你不知道要委托给哪个Listener,因此可以覆盖每个方法中的Listener,将回调添加到队列中。认证完成后,清空队列即可。
class DelayedListener<ReqT> extends Listener<ReqT> {
  private Listener<ReqT> delegate;
  private List<Runnable> events = new ArrayList<Runnable>();

  @Override public synchronized void onMessage(ReqT message) {
    if (delegate == null) {
      events.add(() -> delegate.onMessage(message));
    } else {
      delegate.onMessage(message);
    }
  }
  ...
  public synchronized void setDelegate(Listener<ReqT> delegate) {
    this.delegate = delegate;
    for (Runnable runnable : events) {
      runnable.run();
    }
    events = null;
  }
}

你的解决方案非常好,解决了一般性问题。我找到了另一个可能的解决方案:似乎使用SimpleForwardingServerCallListener可以将所有回调正常转发,并仅延迟onHalfClosed()回调,这对我也有效。或者这个解决方案存在线程问题吗? - Eugene
我不清楚DelayedListener是如何工作的。我们是在没有设置委托的情况下返回它,然后authorize回调调用setDelegateserverCallHandler.startCall(call, headers)吗? - Abhijit Sarkar
1
@AbhijitSarkar,是的。它在等待授权完成时排队,然后您可以使用setDelegate()方法来清空队列。 - Eric Anderson
如果在身份验证回调之前没有调用serverCallHandler.startCall,那么onMessage监听器是否仍会被调用?表面上看,似乎在调用开始之前,不应该调用onMessage - Abhijit Sarkar
@AbhijitSarkar,拦截器一被调用就会启动通话。也就是说,在调用call.request()之前,对onMessage()的调用应该自然而然地被延迟,但是这个想法是缓存所有回调函数,而onMessage()只是一个例子。 - Eric Anderson
显示剩余4条评论

网页内容由stack overflow 提供, 点击上面的
可以查看英文原文,
原文链接