如何在spring-webflux WebFilter中正确使用slf4j MDC

27

我参考了博客文章《使用 Reactor Context 和 MDC 进行上下文日志记录》,但我不知道如何在 WebFilter 中访问 Reactor 上下文。

@Component
public class RequestIdFilter implements WebFilter {

    @Override
    public Mono<Void> filter(ServerWebExchange exchange, WebFilterChain chain) {
        List<String> myHeader =  exchange.getRequest().getHeaders().get("X-My-Header");

        if (myHeader != null && !myHeader.isEmpty()) {
            MDC.put("myHeader", myHeader.get(0));
        }

        return chain.filter(exchange);
    }
}
6个回答

16

以下是一个基于最新方法的解决方案,截至 2021年5月,取自官方文档

import java.util.List;
import java.util.Optional;
import java.util.UUID;
import java.util.function.Consumer;
import lombok.extern.slf4j.Slf4j;
import org.slf4j.MDC;
import org.springframework.context.annotation.Configuration;
import org.springframework.http.HttpHeaders;
import org.springframework.http.server.reactive.ServerHttpRequest;
import org.springframework.web.server.ServerWebExchange;
import org.springframework.web.server.WebFilter;
import org.springframework.web.server.WebFilterChain;
import reactor.core.publisher.Mono;
import reactor.core.publisher.Signal;
import reactor.util.context.Context;

@Slf4j
@Configuration
public class RequestIdFilter implements WebFilter {

  @Override
  public Mono<Void> filter(ServerWebExchange exchange, WebFilterChain chain) {
    ServerHttpRequest request = exchange.getRequest();
    String requestId = getRequestId(request.getHeaders());
    return chain
        .filter(exchange)
        .doOnEach(logOnEach(r -> log.info("{} {}", request.getMethod(), request.getURI())))
        .contextWrite(Context.of("CONTEXT_KEY", requestId));
  }

  private String getRequestId(HttpHeaders headers) {
    List<String> requestIdHeaders = headers.get("X-Request-ID");
    return requestIdHeaders == null || requestIdHeaders.isEmpty()
        ? UUID.randomUUID().toString()
        : requestIdHeaders.get(0);
  }

  public static <T> Consumer<Signal<T>> logOnEach(Consumer<T> logStatement) {
    return signal -> {
      String contextValue = signal.getContextView().get("CONTEXT_KEY");
      try (MDC.MDCCloseable cMdc = MDC.putCloseable("MDC_KEY", contextValue)) {
        logStatement.accept(signal.get());
      }
    };
  }

  public static <T> Consumer<Signal<T>> logOnNext(Consumer<T> logStatement) {
    return signal -> {
      if (!signal.isOnNext()) return;
      String contextValue = signal.getContextView().get("CONTEXT_KEY");
      try (MDC.MDCCloseable cMdc = MDC.putCloseable("MDC_KEY", contextValue)) {
        logStatement.accept(signal.get());
      }
    };
  }
}

假设您在 application.properties 中有以下行:

logging.pattern.level=[%X{MDC_KEY}] %5p

那么每次调用端点时,您的服务器日志将包含类似于以下的日志:

2021-05-06 17:07:41.852 [60b38305-7005-4a05-bac7-ab2636e74d94]  INFO 20158 --- [or-http-epoll-6] my.package.RequestIdFilter    : GET http://localhost:12345/my-endpoint/444444/
在响应式上下文中手动记录某些内容时,每次都需要将以下内容添加到响应式链中:

每当您想在响应式上下文中手动记录某些内容时,您需要将以下内容添加到响应式链中:

.doOnEach(logOnNext(r -> log.info("Something")))

如果您想要将X-Request-ID用于分布式跟踪并传播到其他服务,您需要从响应式上下文中读取它(而不是从MDC中读取),并使用以下内容包装您的WebClient代码:

如果你希望X-Request-ID被用于分布式跟踪并在其他服务中传递,你需要从reactive context(响应式上下文)中读取它(而不是从MDC中读取),然后用以下方式包装你的WebClient代码:

Mono.deferContextual(
    ctx -> {
      RequestHeadersSpec<?> request = webClient.get().uri(uri);
      request = request.header("X-Request-ID", ctx.get("CONTEXT_KEY"));
      // The rest of your request logic...
    });

1
MDC 实际上被使用了吗?我们是否可以简单地使用 UUID 并将其通过反应式链传递,或者我还遗漏了其他东西? - Luis Mendoza
1
logOnEach()logOnNext()之间的实际区别是什么 - 动态更改最新内容? - Zon
在日志中没有反映出来,这个解决方案还有效吗? - undefined

10
从Spring Boot 2.2开始,有Schedulers.onScheduleHook功能,使您能够处理MDC。
Schedulers.onScheduleHook("mdc", runnable -> {
    Map<String, String> map = MDC.getCopyOfContextMap();
    return () -> {
        if (map != null) {
            MDC.setContextMap(map);
        }
        try {
            runnable.run();
        } finally {
            MDC.clear();
        }
    };
});

另外,可以使用 Hooks.onEachOperator 通过订阅者上下文传递 MDC 值。

http://ttddyy.github.io/mdc-with-webclient-in-webmvc/

这不是完整的 MDC 解决方案,例如在我的情况下,我无法清理 R2DBC 线程中的 MDC 值。

更新: 这篇文章真正解决了我的 MDC 问题:https://www.novatec-gmbh.de/en/blog/how-can-the-mdc-context-be-used-in-the-reactive-spring-applications/

它提供了基于订阅者上下文更新 MDC 的正确方法。

结合由 AuthenticationWebFilter 填充的 SecurityContext::class.java 键,你将能够将用户登录信息放入日志中。


你把 Schedulers.onScheduleHook 放在哪里? - Marco Lackovic
你可以在Spring配置类中使用@PostConstruct。这篇文章中的方法对我很有效。虽然不确定性能影响如何,但我认为应该没问题。 - Dmytro
很遗憾,https://www.novatec-gmbh.de/en/blog/how-can-the-mdc-context-be-used-in-the-reactive-spring-applications/不再可用。 - undefined

9
您可以按照以下类似的方式操作,您可以使用任何喜欢的类来设置context,在这个例子中,我只是使用了标题 - 但自定义类也可以正常工作。 如果您在此处设置它,则具有处理程序等功能的任何记录器也将访问context
下面的logWithContext设置MDC并在之后清除它。显然,这可以替换为任何您喜欢的内容。
public class RequestIdFilter  implements WebFilter {

    private Logger LOG = LoggerFactory.getLogger(RequestIdFilter.class);

    @Override
    public Mono<Void> filter(ServerWebExchange exchange, WebFilterChain chain) {
        HttpHeaders headers = exchange.getRequest().getHeaders();
        return chain.filter(exchange)
                .doAfterSuccessOrError((r, t) -> logWithContext(headers, httpHeaders -> LOG.info("Some message with MDC set")))
                .subscriberContext(Context.of(HttpHeaders.class, headers));
    }

    static void logWithContext(HttpHeaders headers, Consumer<HttpHeaders> logAction) {
        try {
            headers.forEach((name, values) -> MDC.put(name, values.get(0)));
            logAction.accept(headers);
        } finally {
            headers.keySet().forEach(MDC::remove);
        }

    }

}

5
需要反转.subscriberContext.doAfterSuccessOrError,因为上下文写入只对在其上方的操作符可见。 - Simon Baslé
如果我们想要为整个应用程序设置上下文,我们该如何做呢? - S Atah Ahmed Khan
@SimonBaslé,我正在使用Reactor的.log()。在使用.log()记录事件时,MDC数据会丢失。有没有干净的方法在那里获取MDC? - Anoop Hallimala
4
不行。您需要明白,在同一线程上可能会发生多个不同的处理的世界中,MDC已经不再是一个万能解决方案了。 - Simon Baslé
MDC.put是静态的,并使用内部的静态对象来存储值。这会有问题吗? - Hohenheimsenberg

2
2023年:忘掉logOnNext包装器。使用上下文传播
我从2018年开始使用Reactor,到目前为止,在doOnNext内部没有一个真正好的替代方法,你需要手动将Reactor的上下文中的跟踪字段复制到MDC中,创建自己的特定桥梁,将响应式和命令式世界连接起来,然后你的日志就能有意义了。但是事情变了,终于有了一个新的解决方案-上下文传播。让我们来看看它。
想象一下,你有一个Spring服务,你有一组字段来定义你的诊断上下文,这些字段用于跟踪服务的活动。假设你将这组字段存储为以下属性:management.tracing.baggage.correlation.fields: trace, session
现在,要将这些字段自动填充到执行响应式链调用的线程的MDC中,将这些字段作为响应式上下文,您只需要将context-propagation库添加到您的项目中,并在之后进行以下服务范围的配置即可。
/**
 * 1. Will register ThreadLocalAccessors into ContextRegistry for fields listed in application.yml as property value
 * <b>management.tracing.baggage.correlation.fields</b>
 * 2. Enables Automatic Context Propagation for all reactive methods
 *
 * @see <a href="https://github.com/micrometer-metrics/context-propagation">context-propagation</a>
 */
@Configuration
@ConditionalOnClass({ContextRegistry.class, ContextSnapshotFactory.class})
@ConditionalOnProperty(value = "management.tracing.baggage.correlation.fields", matchIfMissing = true)
public class MdcContextPropagationConfiguration {

    public MdcContextPropagationConfiguration(@Value("${management.tracing.baggage.correlation.fields}")
                                              List<String> fields) {
        if (!isEmpty(fields)) {
            fields.forEach(claim -> ContextRegistry.getInstance()
                                                   .registerThreadLocalAccessor(claim,
                                                                                () -> MDC.get(claim),
                                                                                value -> MDC.put(claim, value),
                                                                                () -> MDC.remove(claim)));
            return;
        }

        Hooks.enableAutomaticContextPropagation();
    }
}

这里的诀窍是使用Hooks.enableAutomaticContextPropagation()。一旦我们注册了一组用于传播的ThreadLocalsAccessors,这些访问器将映射出追踪字段,钩子将确保在每次调用链中将注册字段的键下的值从反应式上下文传递到MDC中。
就是这样。

1

基于Reactor 3参考指南方法的解决方案,但使用doOnSuccess而不是doOnEach。

主要思路是以以下方式使用Context进行MDC传播:

  1. 使用上游流程中的MDC状态填充下游Context(将通过派生线程使用)(可以通过.contextWrite(context -> Context.of(MDC.getCopyOfContextMap()))来完成)
  2. 在派生线程中访问下游Context,并使用下游Context中的值填充派生线程中的MDC(主要挑战
  3. 清除下游Context中的MDC(可以通过.doFinally(signalType -> MDC.clear())来完成)

主要问题是在派生线程中访问下游Context。您可以使用最方便的方法实现步骤2。但这是我的解决方案:

webclient.post()
   .bodyValue(someRequestData)
   .retrieve()
   .bodyToMono(String.class)
// By this action we wrap our response with a new Mono and also 
// in parallel fill MDC with values from a downstream Context because
// we have an access to it
   .flatMap(wrapWithFilledMDC())
   .doOnSuccess(response -> someActionWhichRequiresFilledMdc(response)))
// Fill a downstream context with the current MDC state
   .contextWrite(context -> Context.of(MDC.getCopyOfContextMap())) 
// Allows us to clear MDC from derived threads
   .doFinally(signalType -> MDC.clear())
   .block();

// Function which implements second step from the above main idea
public static <T> Function<T, Mono<T>> wrapWithFilledMDC() {
// Using deferContextual we have an access to downstream Context, so
// we can just fill MDC in derived threads with 
// values from the downstream Context
   return item -> Mono.deferContextual(contextView -> {
// Function for filling MDC with Context values 
// (you can apply your action)
      fillMdcWithContextView(contextView);
      return Mono.just(item);
   });
}

public static void fillMdcWithContextValues(ContextView contextView) {
   contextView.forEach(
      (key, value) -> {
          if (key instanceof String keyStr && value instanceof String valueStr) {
             MDC.put(keyStr, valueStr);
          }
   });
}

这种方法也可以应用于 doOnError 和 onErrorResume 方法,因为主要思想是相同的。
使用的版本:
- spring-boot:2.7.3 - spring-webflux:5.3.22(来自spring-boot) - reactor-core:3.4.22(来自spring-webflux) - reactor-netty:1.0.22(来自spring-webflux)

-3

我用以下方式实现了这个:

package com.nks.app.filter;

import lombok.extern.slf4j.Slf4j;
import org.slf4j.MDC;
import org.springframework.stereotype.Component;
import org.springframework.web.server.ServerWebExchange;
import org.springframework.web.server.WebFilter;
import org.springframework.web.server.WebFilterChain;
import reactor.core.publisher.Mono;

/**
 * @author nks
 */
@Component
@Slf4j
public class SessionIDFilter implements WebFilter {
    
    private static final String APP_SESSION_ID = "app-session-id";
    
    /**
     * Process the Web request and (optionally) delegate to the next
     * {@code WebFilter} through the given {@link WebFilterChain}.
     *
     * @param serverWebExchange the current server exchange
     * @param webFilterChain    provides a way to delegate to the next filter
     * @return {@code Mono<Void>} to indicate when request processing is complete
     */
    @Override
    public Mono<Void> filter(ServerWebExchange serverWebExchange, WebFilterChain webFilterChain) {
        serverWebExchange.getResponse()
                .getHeaders().add(APP_SESSION_ID, serverWebExchange.getRequest().getHeaders().getFirst(APP_SESSION_ID));
        MDC.put(APP_SESSION_ID, serverWebExchange.getRequest().getHeaders().getFirst(APP_SESSION_ID));
        log.info("[{}] : Inside filter of SessionIDFilter, ADDED app-session-id in MDC Logs", MDC.get(APP_SESSION_ID));
        return webFilterChain.filter(serverWebExchange);
    }
}

而且,与线程相关的app-session-id值可以被记录。


2
这是不正确的 - 当您的Mono在另一个线程上调度时(例如,在数据库查询之后),MDC将不会为该线程设置。此外,您的代码中没有适当的MDC清理。 - jreznot
@jreznot,你想让我为每个用例编写实现,这不合适也不相关!!!是的,必须进行清理。这只是一个简单的用例,我正在使用它,并且它正常工作。 - N K Shukla
2
我想强调你的解决方案中一个重要问题:在WebFlux响应式代码中直接使用MDC会导致错误的日志记录,因此有解决方案使用不同的Reactor钩子实现正确的行为。 - jreznot

网页内容由stack overflow 提供, 点击上面的
可以查看英文原文,
原文链接