MDC Logger与CompletableFuture

3

我正在使用MDC Logger,它对我非常有效,除了一个问题。无论在代码中的哪个位置我们使用了CompletableFuture,在创建的线程中,MDC数据都没有传递到下一个线程,导致日志失败。例如,在代码中,我使用了下面的片段来创建新线程。

最初的回答:

在使用CompletableFuture时,MDC数据不能正确地传递到下一个线程,导致日志失败。以下是我用于创建新线程的代码片段:

CompletableFuture.runAsync(() -> getAcountDetails(user));

最初的回答如下:

日志结果如下

2019-04-29 11:44:13,690 INFO  | /app/rest/controller/userdetails | f80fdc1f-8123-3932-a405-dda2dc2a80d5 |[http-nio-8182-exec-5] RestServiceExecutor:  service: 
2019-04-29 11:44:13,690 INFO  | /app/rest/controller/userdetails | f80fdc1f-8123-3932-a405-dda2dc2a80d5 |[http-nio-8182-exec-5] RestServiceExecutor: 
2019-04-29 11:44:13,779 INFO  | /app/rest/controller/userdetails | f80fdc1f-8123-3932-a405-dda2dc2a80d5 |[http-nio-8182-exec-5] UserDetailsRepoImpl: 
2019-04-29 11:44:13,950 INFO   [ForkJoinPool.commonPool-worker-3] RestServiceExecutor:  header: 
2019-04-29 11:44:13,950 INFO   [ForkJoinPool.commonPool-worker-3] RestServiceExecutor:  service: 
2019-04-29 11:44:14,012 INFO   [ForkJoinPool.commonPool-worker-3] CommonMasterDataServiceImpl: Cache: Retrieving Config Data details.
2019-04-29 11:44:14,028 INFO   [ForkJoinPool.commonPool-worker-3] CommonMasterDataServiceImpl: Cache: Retrieved Config Data details : 1
2019-04-29 11:44:14,028 INFO   [ForkJoinPool.commonPool-worker-3] CommonMasterDataServiceImpl: Cache: Retrieving Config Data details.
2019-04-29 11:44:14,033 INFO   [ForkJoinPool.commonPool-worker-3] CommonMasterDataServiceImpl: Cache: Retrieved Config Data details : 1
2019-04-29 11:44:14,147 INFO  | /app/rest/controller/userdetails | f80fdc1f-8123-3932-a405-dda2dc2a80d5 |[http-nio-8182-exec-5] SecondaryCacheServiceImpl: Fetching from secondary cache
2019-04-29 11:44:14,715 INFO  | /app/rest/controller/userdetails | f80fdc1f-8123-3932-a405-dda2dc2a80d5 |[http-nio-8182-exec-5] CommonMasterDataServiceImpl: Cache: Retrieving Config Data details.
2019-04-29 11:44:14,749 INFO  | /app/rest/controller/userdetails | f80fdc1f-8123-3932-a405-dda2dc2a80d5 |[http-nio-8182-exec-5]

以下是我的MDC数据,但在线程[ForkJoinPool.commonPool-worker-3]中无法传递。

最初的回答:

下面是我的MDC数据,但它无法与线程一起传递。

| /app/rest/controller/userdetails | f80fdc1f-8123-3932-a405-dda2dc2a80d5 |

以下是我的logback.xml配置,其中sessionID是MDC数据。将MDC添加到日志记录中可以轻松地跟踪特定会话的日志信息。
<configuration scan="true">
    <appender name="CONSOLE" class="ch.qos.logback.core.ConsoleAppender">
        <encoder>
            <charset>utf-8</charset>
            <Pattern>%d %-5level %X{sessionID} [%thread] %logger{0}: %msg%n</Pattern>
        </encoder>
    </appender>
</configuration>

最初的回答:我尝试了下面的链接。这对于TaskExecutor非常有效,但我没有找到任何适用于CompletableFuture的解决方案。
我尝试了下面的链接http://shengwangi.blogspot.com/2015/09/using-log-mdc-in-multi-thread-helloworld-example.html?_sm_au_=iVVrZDSwwf0vP6MR。这个链接非常适用于TaskExecutor。但是,我尚未找到适用于CompletableFuture的解决方案。

1
请查看此链接中的内容,与编程相关。 - Michał Krzywański
2个回答

7
创建包装方法
static CompletableFuture<Void> myMethod(Runnable runnable) {
    Map<String, String> previous = MDC.getCopyOfContextMap();
    return CompletableFuture.runAsync(() -> {
        MDC.setContextMap(previous);
        try {
            runnable.run();
        } finally {
            MDC.clear();
        }
    });
}

使用它来代替CompletableFuture.runAsync


但不要把它称为myMethod... 可以使用runAsyncWithMDC或类似的名称。 - Thilo
2
谢谢Talex,这应该是有效的,但问题是我们在多个地方都使用了CompletableFuture,所以可能需要在每个地方进行更新,对吗? - Mayur
由于您使用静态方法来运行异步调用,因此无法拦截它。因此,您必须修改每个调用。 - talex

2

我的解决方案主题将是(它将与JDK 9+一起工作,因为自那个版本以来公开了一些可重写的方法)

使整个生态系统都能意识到MDC

为此,我们需要解决以下情况:

  • 我们从这个类中获取了哪些CompletableFuture的新实例? → 我们需要返回一个MDC感知版本。
  • 我们从这个类外部获取了哪些CompletableFuture的新实例? → 我们需要返回一个MDC感知版本。
  • 在CompletableFuture类中使用了哪个执行器? → 在所有情况下,我们需要确保所有执行器都具有MDC感知功能。

为此,让我们通过扩展它来创建一个MDC感知版本的CompletableFuture。我的版本如下所示:

import org.slf4j.MDC;

import java.util.Map;
import java.util.concurrent.*;
import java.util.function.Function;
import java.util.function.Supplier;

public class MDCAwareCompletableFuture<T> extends CompletableFuture<T> {

    public static final ExecutorService MDC_AWARE_ASYNC_POOL = new MDCAwareForkJoinPool();

    @Override
    public CompletableFuture newIncompleteFuture() {
        return new MDCAwareCompletableFuture();
    }

    @Override
    public Executor defaultExecutor() {
        return MDC_AWARE_ASYNC_POOL;
    }

    public static <T> CompletionStage<T> getMDCAwareCompletionStage(CompletableFuture<T> future) {
        return new MDCAwareCompletableFuture<>()
                .completeAsync(() -> null)
                .thenCombineAsync(future, (aVoid, value) -> value);
    }

    public static <T> CompletionStage<T> getMDCHandledCompletionStage(CompletableFuture<T> future,
                                                                Function<Throwable, T> throwableFunction) {
        Map<String, String> contextMap = MDC.getCopyOfContextMap();
        return getMDCAwareCompletionStage(future)
                .handle((value, throwable) -> {
                    setMDCContext(contextMap);
                    if (throwable != null) {
                        return throwableFunction.apply(throwable);
                    }
                    return value;
                });
    }
}

MDCAwareForkJoinPool类将如下所示(为简单起见,省略了带有ForkJoinTask参数的方法)

public class MDCAwareForkJoinPool extends ForkJoinPool {
    //Override constructors which you need

    @Override
    public <T> ForkJoinTask<T> submit(Callable<T> task) {
        return super.submit(MDCUtility.wrapWithMdcContext(task));
    }

    @Override
    public <T> ForkJoinTask<T> submit(Runnable task, T result) {
        return super.submit(wrapWithMdcContext(task), result);
    }

    @Override
    public ForkJoinTask<?> submit(Runnable task) {
        return super.submit(wrapWithMdcContext(task));
    }

    @Override
    public void execute(Runnable task) {
        super.execute(wrapWithMdcContext(task));
    }
}

实用的方法来包装应该是这样的:
public static <T> Callable<T> wrapWithMdcContext(Callable<T> task) {
    //save the current MDC context
    Map<String, String> contextMap = MDC.getCopyOfContextMap();
    return () -> {
        setMDCContext(contextMap);
        try {
            return task.call();
        } finally {
            // once the task is complete, clear MDC
            MDC.clear();
        }
    };
}

public static Runnable wrapWithMdcContext(Runnable task) {
    //save the current MDC context
    Map<String, String> contextMap = MDC.getCopyOfContextMap();
    return () -> {
        setMDCContext(contextMap);
        try {
            task.run();
        } finally {
            // once the task is complete, clear MDC
            MDC.clear();
        }
    };
}

public static void setMDCContext(Map<String, String> contextMap) {
   MDC.clear();
   if (contextMap != null) {
       MDC.setContextMap(contextMap);
    }
}

以下是使用指南:
  • 使用类MDCAwareCompletableFuture而不是类CompletableFuture
  • CompletableFuture中的一些方法实例化了自身版本,例如new CompletableFuture...。对于这些方法(大多数公共静态方法),请使用替代方法来获取MDCAwareCompletableFuture的实例。使用替代方法的示例可能是,而不是使用CompletableFuture.supplyAsync(...),您可以选择new MDCAwareCompletableFuture<>().completeAsync(...)
  • 如果由于某些外部库返回CompletableFuture的实例而陷入困境,请使用getMDCAwareCompletionStage方法将其转换为MDCAwareCompletableFuture。显然,您无法在该库中保留上下文,但是此方法仍将在代码进入应用程序代码后保留上下文。
  • 在提供执行程序作为参数时,请确保它是MDC Aware,例如MDCAwareForkJoinPool。您还可以通过覆盖execute方法来创建MDCAwareThreadPoolExecutor以符合您的用例。你懂的!

您可以在此处找到上述所有内容的详细说明。

有了这个,您的代码可能看起来像:

new MDCAwareCompletableFuture<>().completeAsync(() -> {
            getAcountDetails(user);
            return null;
        });

网页内容由stack overflow 提供, 点击上面的
可以查看英文原文,
原文链接