修改Go GRPC服务器流拦截器上的元数据

4

我一直在尝试在服务器流拦截器上设置元数据,以便实际的RPC函数可以在下游读取它们:

func UserIDInterceptor(srv interface{}, ss grpc.ServerStream, info *grpc.StreamServerInfo, handler grpc.StreamHandler) error {
    ss.SendHeader(metadata.New(map[string]string{"X-User-Id": "real_user_id"}))
    return handler(srv, ss)
}

func (server *Server) GetObjects(req *iam.GetObjectsRequest, client iam.Service_GetObjectsServer) error {
    ctx := client.Context()
    userID, ok := HeaderFromMetadata(ctx, "X-User-Id")

    log.Printf("User ID: %s, Ok: %t\n", userID, ok)
    return nil
}

func HeaderFromMetadata(ctx context.Context, headers ...string) (string, bool) {
    meta, ok := metadata.FromIncomingContext(ctx)
    if !ok {
        return "", false
    }

    for _, header := range headers {
        if value := meta.Get(header); len(value) > 0 {
            return value[0], true
        }
    }

    return "", false
}

我的服务器是这样注册的:

server := grpc.NewServer(
    grpc.StreamInterceptor(UserIDInterceptor))
RegisterIAMServer(server, NewServer())

我遇到的问题是找不到用户ID头。我可以看到拦截器在客户端发送请求时被调用,我也可以看到元数据包含头文件,但实际的RPC似乎无法提取它。我在这里做错了什么?

1个回答

5

更新

更简单的解决方案是仅覆盖 ServerStreamContext() 方法。

type serverStream struct {
    grpc.ServerStream
    ctx context.Context
}

func (s *serverStream) Context() context.Context {
    return s.ctx
}

func UserIDInterceptor(srv interface{}, ss grpc.ServerStream, info *grpc.StreamServerInfo, handler grpc.StreamHandler) error {
    md, ok := metadata.FromIncomingContext(ss.Context())
    if ok {
        md.Append("X-User-Id", "real_user_id")
    }
    newCtx := metadata.NewIncomingContext(ss.Context(), md)

    return handler(srv, &serverStream{ss, newCtx})
}

更新

另一个简单的解决方案是定义一个包装器来包装grpc.ServerStream,如下:

type serverStreamWrapper struct {
    ss  grpc.ServerStream
    ctx context.Context
}

func (w serverStreamWrapper) Context() context.Context        { return w.ctx }
func (w serverStreamWrapper) RecvMsg(msg interface{}) error   { return w.ss.RecvMsg(msg) }
func (w serverStreamWrapper) SendMsg(msg interface{}) error   { return w.ss.SendMsg(msg) }
func (w serverStreamWrapper) SendHeader(md metadata.MD) error { return w.ss.SendHeader(md) }
func (w serverStreamWrapper) SetHeader(md metadata.MD) error  { return w.ss.SetHeader(md) }
func (w serverStreamWrapper) SetTrailer(md metadata.MD)       { w.ss.SetTrailer(md) }

func UserIDInterceptor(srv interface{}, ss grpc.ServerStream, info *grpc.StreamServerInfo, handler grpc.StreamHandler) error {
    md, ok := metadata.FromIncomingContext(ss.Context())
    if ok {
        md.Append("X-User-Id", "real_user_id")
    }
    newCtx := metadata.NewIncomingContext(ss.Context(), md)

    return handler(srv, serverStreamWrapper{ss, newCtx})
}

你可以使用NewIncomingContext来在流中创建当前上下文的副本。

因为没有设置grpc.ServerStreamcontext方法,所以需要使用带有context.ContextwrappedStream来定义,并使用SetContext方法来设置context.ContextServerStream中。

type wrappedStream struct {
    grpc.ServerStream
    ctx context.Context
}

func (w *wrappedStream) SetContext(ctx context.Context) {
    w.ctx = ctx
}

完整的示例代码

type wrappedStream struct {
    grpc.ServerStream
    ctx context.Context
}

func (w *wrappedStream) Context() context.Context {
    return w.ctx
}

func (w *wrappedStream) SetContext(ctx context.Context) {
    w.ctx = ctx
}

func (w *wrappedStream) RecvMsg(m interface{}) error {
    return w.ServerStream.RecvMsg(m)
}

func (w *wrappedStream) SendMsg(m interface{}) error {
    return w.ServerStream.SendMsg(m)
}

type StreamContextWrapper interface {
    grpc.ServerStream
    SetContext(context.Context)
}

func newStreamContextWrapper(ss grpc.ServerStream) StreamContextWrapper {
    ctx := ss.Context()
    return &wrappedStream{
        ss,
        ctx,
    }
}

func UserIDInterceptor(srv interface{}, ss grpc.ServerStream, info *grpc.StreamServerInfo, handler grpc.StreamHandler) error {
    md, ok := metadata.FromIncomingContext(ss.Context())
    if ok {
        md.Append("X-User-Id", "real_user_id")
    }
    newCtx := metadata.NewIncomingContext(ss.Context(), md)

    sw := newStreamContextWrapper(ss)
    sw.SetContext(newCtx)

    return handler(srv, sw)
}

网页内容由stack overflow 提供, 点击上面的
可以查看英文原文,
原文链接