更新
更简单的解决方案是仅覆盖 ServerStream
的 Context()
方法。
type serverStream struct {
grpc.ServerStream
ctx context.Context
}
func (s *serverStream) Context() context.Context {
return s.ctx
}
func UserIDInterceptor(srv interface{}, ss grpc.ServerStream, info *grpc.StreamServerInfo, handler grpc.StreamHandler) error {
md, ok := metadata.FromIncomingContext(ss.Context())
if ok {
md.Append("X-User-Id", "real_user_id")
}
newCtx := metadata.NewIncomingContext(ss.Context(), md)
return handler(srv, &serverStream{ss, newCtx})
}
更新
另一个简单的解决方案是定义一个包装器来包装grpc.ServerStream
,如下:
type serverStreamWrapper struct {
ss grpc.ServerStream
ctx context.Context
}
func (w serverStreamWrapper) Context() context.Context { return w.ctx }
func (w serverStreamWrapper) RecvMsg(msg interface{}) error { return w.ss.RecvMsg(msg) }
func (w serverStreamWrapper) SendMsg(msg interface{}) error { return w.ss.SendMsg(msg) }
func (w serverStreamWrapper) SendHeader(md metadata.MD) error { return w.ss.SendHeader(md) }
func (w serverStreamWrapper) SetHeader(md metadata.MD) error { return w.ss.SetHeader(md) }
func (w serverStreamWrapper) SetTrailer(md metadata.MD) { w.ss.SetTrailer(md) }
func UserIDInterceptor(srv interface{}, ss grpc.ServerStream, info *grpc.StreamServerInfo, handler grpc.StreamHandler) error {
md, ok := metadata.FromIncomingContext(ss.Context())
if ok {
md.Append("X-User-Id", "real_user_id")
}
newCtx := metadata.NewIncomingContext(ss.Context(), md)
return handler(srv, serverStreamWrapper{ss, newCtx})
}
你可以使用NewIncomingContext来在流中创建当前上下文的副本。
因为没有设置grpc.ServerStream
的context
方法,所以需要使用带有context.Context
的wrappedStream
来定义,并使用SetContext
方法来设置context.Context
到ServerStream
中。
type wrappedStream struct {
grpc.ServerStream
ctx context.Context
}
func (w *wrappedStream) SetContext(ctx context.Context) {
w.ctx = ctx
}
完整的示例代码
type wrappedStream struct {
grpc.ServerStream
ctx context.Context
}
func (w *wrappedStream) Context() context.Context {
return w.ctx
}
func (w *wrappedStream) SetContext(ctx context.Context) {
w.ctx = ctx
}
func (w *wrappedStream) RecvMsg(m interface{}) error {
return w.ServerStream.RecvMsg(m)
}
func (w *wrappedStream) SendMsg(m interface{}) error {
return w.ServerStream.SendMsg(m)
}
type StreamContextWrapper interface {
grpc.ServerStream
SetContext(context.Context)
}
func newStreamContextWrapper(ss grpc.ServerStream) StreamContextWrapper {
ctx := ss.Context()
return &wrappedStream{
ss,
ctx,
}
}
func UserIDInterceptor(srv interface{}, ss grpc.ServerStream, info *grpc.StreamServerInfo, handler grpc.StreamHandler) error {
md, ok := metadata.FromIncomingContext(ss.Context())
if ok {
md.Append("X-User-Id", "real_user_id")
}
newCtx := metadata.NewIncomingContext(ss.Context(), md)
sw := newStreamContextWrapper(ss)
sw.SetContext(newCtx)
return handler(srv, sw)
}