如何在Java中使用MDC和parallelStream,并且记录到logback中

10

我需要记录请求的一些属性,比如请求ID和语言环境,但当使用parallelStream时,MDC的ThreadLocal似乎会丢失信息。

我分析了在创建parallelStream时在线程之间传递MDC上下文的解决方案,但这似乎很不好,并且我还有很多使用parallelStream的地方。

是否有其他方法可以做到这一点?

谢谢

2个回答

7
我发现唯一的解决办法是将上下文复制到流之外的最终变量中,并在每个迭代中应用它。
Map<String, String> contextMap = MDC.getCopyOfContextMap();
Stream.iterate(0, i -> i + 1).parallel()
    .peek(i -> MDC.setContextMap(contextMap))
    // ...logic...
    // in case you're using a filter, you need to use a predicate and combine it with a clear step:
    filter(yourPredicate.or(i -> {
                MDC.clear();
                return false;
            }))
    // clear right before terminal operation
    .peek(i -> MDC.clear())
    .findFirst();

// since the initial thread is also used within the stream and the context is cleared there, 
// we need to set it again to its initial state
MDC.setContextMap(contextMap);    

那个解决方案的代价是:1)每100次迭代需要几微秒,2)可读性较差,但两者都可以接受:
1. 这是一个基准测试,将IntStream.range(0, 100).parallel().sum()(=基准值)与使用MDC复制逻辑的相同流进行比较:
Benchmark               Mode  Cnt   Score   Error   Units
MDC_CopyTest.baseline  thrpt    5   0,038 ± 0,005  ops/us
MDC_CopyTest.withMdc   thrpt    5   0,024 ± 0,001  ops/us
MDC_CopyTest.baseline   avgt    5  28,239 ± 1,308   us/op
MDC_CopyTest.withMdc    avgt    5  40,178 ± 0,761   us/op

为了提高可读性,可以将其包装成一个小的辅助类:
public class MDCCopyHelper {
    private Map<String, String> contextMap = MDC.getCopyOfContextMap();

    public void set(Object... any) {
        MDC.setContextMap(contextMap);
    }

    public void clear(Object... any) {
        MDC.clear();
    }

    public boolean clearAndFail() {
        MDC.clear();
        return false;
    }
}

流式处理代码看起来更加简洁:

MDCCopyHelper mdcHelper = new MDCCopyHelper();
try {
    Optional<Integer> findFirst = Stream.iterate(0, i -> i + 1)
        .parallel()
        .peek(mdcHelper::set)
        // ...logic...
        // filters predicates should be combined with clear step
        .filter(yourPredicate.or(mdcHelper::clearAndFail))
        // before terminal call:
        .peek(mdcHelper::clear)
        .findFirst();
} finally {
    // set the correct MDC at the main thread again
    mdcHelper.set();
}

如果逻辑内部抛出异常怎么办?这不会让MDC处于脏状态吗?你考虑过异常情况下的解决方案吗? - fasfsfgs
1
不好意思,我不知道如何处理那个。在异常处理方面,流非常糟糕。 - rudi
注意在结尾处使用.peek(mdcHelper::clear)。parallel()之后的所有内容都将在多个线程中执行,但其中包括主线程。主线程不会空闲,它是活跃的。因此,您可能会在主线程中得到一个空的MDC。 - adiesner
1
我之前不知道这一点,所以我跳过了代码中的最后一个命令,因为我认为那是一个无用的额外命令。当并行流可能会抛出异常时,主线程在流执行后也有可能无法恢复MDC。 - adiesner
没错,所以如果预计会出现异常,那么这部分代码也应该放在catch/finally块中。 - rudi
显示剩余2条评论

2

我的解决方案是封装那些功能接口,类似于静态代理设计模式。
例如:

public static void main(String[] args) {
    System.err.println(Thread.currentThread().getName());
    String traceId = "100";
    MDC.put("id", traceId);
    System.err.println("------------------------");
    Stream.of(1, 2, 3, 4)
          .parallel()
          .forEach((num -> {
              System.err.println(Thread.currentThread().getName()+" "+ traceId.equals(MDC.get("id")));
          }));
    System.err.println("------------------------");
    Stream.of(1, 2, 3, 4)
          .parallel()
          // the key is the TraceableConsumer
          .forEach(new TraceableConsumer<>(num -> {
              System.err.println(Thread.currentThread().getName() + " " + traceId.equals(MDC.get("id")));
          }));
}

public class TraceableConsumer<T> extends MDCTraceable implements Consumer<T> {

    private final Consumer<T> target;

    public TraceableConsumer(Consumer<T> target) {
        this.target = target;
    }

    @Override
    public void accept(T t) {
        setMDC();
        target.accept(t);
    }
}

public abstract class MDCTraceable {

    private final Long id;

    private final Long userId;

    public MDCTraceable() {
        id = Optional.ofNullable(MDC.get("id")).map(Long::parseLong).orElse(0L);
        userId = Optional.ofNullable(MDC.get("userId")).map(Long::parseLong).orElse(0L);
    }

    public void setMDC(){
        MDC.put("id", id.toString());
        MDC.put("userId", userId.toString());
    }

    public void cleanMDC(){
        MDC.clear();
    }
}

网页内容由stack overflow 提供, 点击上面的
可以查看英文原文,
原文链接