如何在Spring WebFlux中记录请求和响应体

59

我希望在使用Kotlin的Spring WebFlux REST API中拥有请求和响应的集中式日志记录。到目前为止,我尝试了以下方法:

@Bean
fun apiRouter() = router {
    (accept(MediaType.APPLICATION_JSON) and "/api").nest {
        "/user".nest {
            GET("/", userHandler::listUsers)
            POST("/{userId}", userHandler::updateUser)
        }
    }
}.filter { request, next ->
    logger.info { "Processing request $request with body ${request.bodyToMono<String>()}" }
    next.handle(request).doOnSuccess { logger.info { "Handling with response $it" } }
}

这里请求方法和路径日志已经成功,但是内容是 Mono,我该如何记录它呢?是不是要反过来,在请求体 Mono 上订阅并在回调函数中记录?还有一个问题是这里的 ServerResponse 接口没有访问响应体的权限。我如何在这里获取它?


另一种尝试是使用 WebFilter

@Bean
fun loggingFilter(): WebFilter =
        WebFilter { exchange, chain ->
            val request = exchange.request
            logger.info { "Processing request method=${request.method} path=${request.path.pathWithinApplication()} params=[${request.queryParams}] body=[${request.body}]"  }

            val result = chain.filter(exchange)

            logger.info { "Handling with response ${exchange.response}" }

            return@WebFilter result
        }

我也遇到了同样的问题:请求体是Flux,但没有响应体。

有没有一种方法可以从某些过滤器中访问完整的请求和响应以进行日志记录?我理解错了什么吗?


1
这篇文章包含了记录请求体的代码(Java)- https://dev59.com/7VIH5IYBdhLWcg3wA3t5 - Oten
12个回答

25

这与Spring MVC中的情况或多或少相似。

在Spring MVC中,您可以使用AbstractRequestLoggingFilter过滤器和ContentCachingRequestWrapper和/或ContentCachingResponseWrapper。这里有很多权衡:

  • 如果您想要访问servlet请求属性,则需要实际读取和解析请求正文
  • 记录请求正文意味着缓冲请求正文,这可能会使用大量内存
  • 如果您想要访问响应正文,则需要包装响应并缓冲正在编写的响应正文以供以后检索

ContentCaching*Wrapper类在WebFlux中不存在,但您可以创建类似的类。但请记住其他要点:

  • 在内存中缓冲数据某种程度上违反了反应堆栈,因为我们试图尽可能高效地利用可用资源
  • 您不应该篡改实际的数据流,并且比预期刷新更多/更少,否则您将冒着破坏流使用案例的风险
  • 在那个级别上,您只能访问DataBuffer实例,它们是(粗略地)内存高效的字节数组。这些属于缓冲池,并且被回收以供其他交换使用。如果这些没有正确保留/释放,则会创建内存泄漏(并且将数据缓冲以供以后使用肯定符合该场景)
  • 同样在那个级别上,只有字节,您没有访问任何编解码器来解析HTTP正文的权限。如果一开始就无法读取内容,则应忘记缓冲内容

对您问题的其他答案:

  • 是的,WebFilter可能是最好的方法
  • 否则,您将消耗处理程序无法读取的数据;您可以在请求上flatMap并在doOn操作员中缓冲数据
  • 包装响应应该使您能够访问正在编写的响应正文;但不要忘记内存泄漏

5
感谢您提供详细的回答。看起来这样高级别的过滤和记录与核心的反应式思想背道而驰,我应该考虑将日志记录移到业务层面(至少针对响应)。 - Koguro
1
@brian-clozel,你说的“在请求上使用flatMap”是什么意思?能详细说明一下吗? - javabeats
你能详细说明一下retain/release模型吗?我在StringDecoder中看到它被使用,但不太理解。PooledDataBuffer文档在这方面没有用处。 - Abhijit Sarkar
1
关于“为什么要这样做”的问题:我有一个使用案例,我们必须为了可见性目的持久化每个“消息”(请求/响应下行和上行)。我可以在我的控制器方法中接受字符串,并自己进行解析,但那太麻烦了。我也可以在持久化之前序列化POJO,但那只是浪费资源。所以我想,也许有一种方法可以“潜入”WebFlux/Netty管道,这样我就可以在处理过程中将请求体的表示保留在内存中,以便进行持久化。不确定为什么这比手动序列化更糟糕。 - 62mkv

15

这是我为Java想出来的东西。

public class RequestResponseLoggingFilter implements WebFilter {

    @Override
    public Mono<Void> filter(ServerWebExchange exchange, WebFilterChain chain) {
        ServerHttpRequest httpRequest = exchange.getRequest();
        final String httpUrl = httpRequest.getURI().toString();

        ServerHttpRequestDecorator loggingServerHttpRequestDecorator = new ServerHttpRequestDecorator(exchange.getRequest()) {
            String requestBody = "";

            @Override
            public Flux<DataBuffer> getBody() {
                return super.getBody().doOnNext(dataBuffer -> {
                    try (ByteArrayOutputStream byteArrayOutputStream = new ByteArrayOutputStream()) {
                        Channels.newChannel(byteArrayOutputStream).write(dataBuffer.asByteBuffer().asReadOnlyBuffer());
                        requestBody = IOUtils.toString(byteArrayOutputStream.toByteArray(), "UTF-8");
                        commonLogger.info(LogMessage.builder()
                                .step(httpUrl)
                                .message("log incoming http request")
                                .stringPayload(requestBody)
                                .build());
                    } catch (IOException e) {
                        commonLogger.error(LogMessage.builder()
                                .step("log incoming request for " + httpUrl)
                                .message("fail to log incoming http request")
                                .errorType("IO exception")
                                .stringPayload(requestBody)
                                .build(), e);
                    }
                });
            }
        };

        ServerHttpResponseDecorator loggingServerHttpResponseDecorator = new ServerHttpResponseDecorator(exchange.getResponse()) {
            String responseBody = "";
            @Override
            public Mono<Void> writeWith(Publisher<? extends DataBuffer> body) {
                Mono<DataBuffer> buffer = Mono.from(body);
                return super.writeWith(buffer.doOnNext(dataBuffer -> {
                    try (ByteArrayOutputStream byteArrayOutputStream = new ByteArrayOutputStream()) {
                        Channels.newChannel(byteArrayOutputStream).write(dataBuffer.asByteBuffer().asReadOnlyBuffer());
                        responseBody = IOUtils.toString(byteArrayOutputStream.toByteArray(), "UTF-8");
                        commonLogger.info(LogMessage.builder()
                                .step("log outgoing response for " + httpUrl)
                                .message("incoming http request")
                                .stringPayload(responseBody)
                                .build());
                    } catch (Exception e) {
                        commonLogger.error(LogMessage.builder()
                                .step("log outgoing response for " + httpUrl)
                                .message("fail to log http response")
                                .errorType("IO exception")
                                .stringPayload(responseBody)
                                .build(), e);
                    }
                }));
            }
        };
        return chain.filter(exchange.mutate().request(loggingServerHttpRequestDecorator).response(loggingServerHttpResponseDecorator).build());
    }

}

请注意,仅通过重写 getBody() 方法的方式只适用于访问主体的控制器(请参见 @RequestBody)。 - OSGI Java

13

我找不到一个好的方法来记录请求/响应体,但如果你只关心元数据,那么可以按照以下方式进行。

import org.springframework.http.HttpHeaders
import org.springframework.http.HttpStatus
import org.springframework.http.server.reactive.ServerHttpResponse
import org.springframework.stereotype.Component
import org.springframework.web.server.ServerWebExchange
import org.springframework.web.server.WebFilter
import org.springframework.web.server.WebFilterChain
import reactor.core.publisher.Mono

@Component
class LoggingFilter(val requestLogger: RequestLogger, val requestIdFactory: RequestIdFactory) : WebFilter {
    val logger = logger()

    override fun filter(exchange: ServerWebExchange, chain: WebFilterChain): Mono<Void> {
        logger.info(requestLogger.getRequestMessage(exchange))
        val filter = chain.filter(exchange)
        exchange.response.beforeCommit {
            logger.info(requestLogger.getResponseMessage(exchange))
            Mono.empty()
        }
        return filter
    }
}

@Component
class RequestLogger {

    fun getRequestMessage(exchange: ServerWebExchange): String {
        val request = exchange.request
        val method = request.method
        val path = request.uri.path
        val acceptableMediaTypes = request.headers.accept
        val contentType = request.headers.contentType
        return ">>> $method $path ${HttpHeaders.ACCEPT}: $acceptableMediaTypes ${HttpHeaders.CONTENT_TYPE}: $contentType"
    }

    fun getResponseMessage(exchange: ServerWebExchange): String {
        val request = exchange.request
        val response = exchange.response
        val method = request.method
        val path = request.uri.path
        val statusCode = getStatus(response)
        val contentType = response.headers.contentType
        return "<<< $method $path HTTP${statusCode.value()} ${statusCode.reasonPhrase} ${HttpHeaders.CONTENT_TYPE}: $contentType"
    }

    private fun getStatus(response: ServerHttpResponse): HttpStatus =
        try {
            response.statusCode
        } catch (ex: Exception) {
            HttpStatus.CONTINUE
        }
}

每次使用这种方法时,我都会得到一个http 100状态(因为response.statusCode为空)。到目前为止,我还没有能够弄清楚如何在WebFilter中正确获取响应的状态码。有人知道吗? - Kyprus
2
该死的“var”东西。 - K.Nicholas

5

您实际上可以启用与Netty和Reactor-Netty相关的DEBUG日志记录,以便全面了解发生的情况。您可以尝试以下内容,看看您想要什么,不想要什么。这是我所能提供的最好帮助。

reactor.ipc.netty.channel.ChannelOperationsHandler: DEBUG
reactor.ipc.netty.http.server.HttpServer: DEBUG
reactor.ipc.netty.http.client: DEBUG
io.reactivex.netty.protocol.http.client: DEBUG
io.netty.handler: DEBUG
io.netty.handler.proxy.HttpProxyHandler: DEBUG
io.netty.handler.proxy.ProxyHandler: DEBUG
org.springframework.web.reactive.function.client: DEBUG
reactor.ipc.netty.channel: DEBUG

1
这是一种本地调试选项,但我们不能在生产实例中使用它,因为它会暴露头部内容。 - Arunkumar Arjunan
2
我刚刚添加了这个。它甚至没有显示任何日志。 - RamPrakash

5

自从Spring Boot 2.2.x起,Spring Webflux支持Kotlin协程。使用协程,可以在无需处理Mono和Flux包装对象的情况下获得非阻塞调用的优势。它为ServerRequestServerResponse添加了扩展方法,例如ServerRequest#awaitBody()ServerResponse.BodyBuilder.bodyValueAndAwait(body: Any)。因此,您可以像这样重写代码:

@Bean
fun apiRouter() = coRouter {
    (accept(MediaType.APPLICATION_JSON) and "/api").nest {
        "/user".nest {
            /* the handler methods now use ServerRequest and ServerResponse directly
             you just need to add suspend before your function declaration:
             suspend fun listUsers(ServerRequest req, ServerResponse res) */ 
            GET("/", userHandler::listUsers)
            POST("/{userId}", userHandler::updateUser)
        }
    }

    // this filter will be applied to all routes built by this coRouter
    filter { request, next ->
      // using non-blocking request.awayBody<T>()
      logger.info("Processing $request with body ${request.awaitBody<String>()}")
        val res = next(request)
        logger.info("Handling with Content-Type ${res.headers().contentType} and status code ${res.rawStatusCode()}")
        res 
    }
}

为了使用coRoutines创建WebFilter Bean,我认为您可以使用这个CoroutineWebFilter接口(我没有测试过,不知道它是否有效)。

4

我对Spring WebFlux还比较陌生,不知道如何在Kotlin中操作,但应该与Java中使用WebFilter相同:

public class PayloadLoggingWebFilter implements WebFilter {

    public static final ByteArrayOutputStream EMPTY_BYTE_ARRAY_OUTPUT_STREAM = new ByteArrayOutputStream(0);

    private final Logger logger;
    private final boolean encodeBytes;

    public PayloadLoggingWebFilter(Logger logger) {
        this(logger, false);
    }

    public PayloadLoggingWebFilter(Logger logger, boolean encodeBytes) {
        this.logger = logger;
        this.encodeBytes = encodeBytes;
    }

    @Override
    public Mono<Void> filter(ServerWebExchange exchange, WebFilterChain chain) {
        if (logger.isInfoEnabled()) {
            return chain.filter(decorate(exchange));
        } else {
            return chain.filter(exchange);
        }
    }

    private ServerWebExchange decorate(ServerWebExchange exchange) {
        final ServerHttpRequest decorated = new ServerHttpRequestDecorator(exchange.getRequest()) {

            @Override
            public Flux<DataBuffer> getBody() {

                if (logger.isDebugEnabled()) {
                    final ByteArrayOutputStream baos = new ByteArrayOutputStream();
                    return super.getBody().map(dataBuffer -> {
                        try {
                            Channels.newChannel(baos).write(dataBuffer.asByteBuffer().asReadOnlyBuffer());
                        } catch (IOException e) {
                            logger.error("Unable to log input request due to an error", e);
                        }
                        return dataBuffer;
                    }).doOnComplete(() -> flushLog(baos));

                } else {
                    return super.getBody().doOnComplete(() -> flushLog(EMPTY_BYTE_ARRAY_OUTPUT_STREAM));
                }
            }

        };

        return new ServerWebExchangeDecorator(exchange) {

            @Override
            public ServerHttpRequest getRequest() {
                return decorated;
            }

            private void flushLog(ByteArrayOutputStream baos) {
                ServerHttpRequest request = super.getRequest();
                if (logger.isInfoEnabled()) {
                    StringBuffer data = new StringBuffer();
                    data.append('[').append(request.getMethodValue())
                        .append("] '").append(String.valueOf(request.getURI()))
                        .append("' from ")
                            .append(
                                Optional.ofNullable(request.getRemoteAddress())
                                            .map(addr -> addr.getHostString())
                                        .orElse("null")
                            );
                    if (logger.isDebugEnabled()) {
                        data.append(" with payload [\n");
                        if (encodeBytes) {
                            data.append(new HexBinaryAdapter().marshal(baos.toByteArray()));
                        } else {
                            data.append(baos.toString());
                        }
                        data.append("\n]");
                        logger.debug(data.toString());
                    } else {
                        logger.info(data.toString());
                    }

                }
            }
        };
    }

}

这里有一些相关的IT技术测试:github 我认为这就是Brian Clozel (@brian-clozel)所指的。

6
这与反应式编程背道而驰,因为你需要缓存整个内容。这显然不是Brian所说的。 - Abhijit Sarkar
我发现这个例子很有帮助。我可以使用这种机制将请求JSON保存在数据库中,以维护审计跟踪。 - Sylvester
@Silvmike 这仅适用于调用 getBody() 的 POST 请求。如果我需要为 GET 请求调用 flushLog,应该怎么做?在这种情况下,不会调用 getBody()。 - user2761431
我进行了一次黑客攻击,我覆盖了 getMethodValue() 方法,并在请求类型为 GET 时调用了 flushLog。 - user2761431
请注意,仅通过覆盖 getBody() 方法的方法仅适用于访问正文的控制器(请参见 @RequestBody)。 - OSGI Java

2
"Brian所说的。此外,记录请求/响应主体对于反应式流不合理。如果您将通过管道流动的数据想象为流,则在任何时间都没有完整的内容,除非您对其进行缓冲,这样做就失去了全部意义。对于小型请求/响应,您可以使用缓冲来解决问题,但那么为什么要使用反应式模型(除了让同事们印象深刻 :-))?"
"我能想到的记录请求/响应的唯一原因是调试,但是使用反应式编程模型时,调试方法也必须进行修改。 Project Reactor文档有一个关于调试的优秀部分,您可以参考:http://projectreactor.io/docs/core/snapshot/reference/#debugging。"

1
这是用于开发调试的。没有人会在生产环境中启用调试。我已经在我的另一篇文章中详细解释了为什么需要调试 https://dev59.com/Bafja4cB1Zd3GeqPsDVg?noredirect=1#comment82218651_47624492 - Ashok Koyi
1
这里有一个与调试无关的场景,假设您配置了重试策略以在返回 HttpStatus 503/504 时生效。我觉得记录此响应不仅仅是调试,它可能会为我们提供有用的信息,如果 API 这样做的话,就可以解释发生这种情况的原因。所以我希望我已经找到了一种在合理的代码行数内解决这个问题的方法,而不是像我在寻找几天后看到的那样长达 500 行的代码,这让我感到非常沮丧。 - tonio
如果您的目标是分析客户接收到的数据,并且您需要存储每个响应,那么您会如何处理呢?我考虑使用并行协程来执行此操作,而主线程将像往常一样运行。 - Leonid Bor
@LeonidBor 我会建议您发布一个新问题,而不是劫持5年前的线程。 - Abhijit Sarkar
@AbhijitSarkar 很抱歉,根据StackOverflow社区准则,我无法在同一主题上创建新问题。 我仍在寻找实现目标的好方法,到目前为止,最好的方向似乎是在事件循环线程中手动调用DataBufferUtils.retain(buffer),然后在日志记录线程中调用DataBufferUtils.release(buffer)一旦日志记录操作完成 - Leonid Bor

2
这里是与webflux/java相关的GitHub代码库,包含完整实现,可记录请求和响应体以及HTTP头。请注意保留HTML标签。

1

Ivan Lymar 的回答,但是用 Kotlin 实现:

import org.apache.commons.io.IOUtils
import org.reactivestreams.Publisher
import org.springframework.core.io.buffer.DataBuffer
import org.springframework.http.server.reactive.ServerHttpRequestDecorator
import org.springframework.http.server.reactive.ServerHttpResponseDecorator
import org.springframework.stereotype.Component
import org.springframework.web.server.ServerWebExchange
import org.springframework.web.server.WebFilter
import org.springframework.web.server.WebFilterChain
import reactor.core.publisher.Flux
import reactor.core.publisher.Mono
import java.io.ByteArrayOutputStream
import java.io.IOException
import java.nio.channels.Channels

@Component
class LoggingWebFilter : WebFilter {

    override fun filter(exchange: ServerWebExchange, chain: WebFilterChain): Mono<Void> {
        val httpRequest = exchange.request
        val httpUrl = httpRequest.uri.toString()
        val loggingServerHttpRequestDecorator: ServerHttpRequestDecorator =
            object : ServerHttpRequestDecorator(exchange.request) {
                var requestBody = ""
                override fun getBody(): Flux<DataBuffer> {
                    return super.getBody().doOnNext { dataBuffer: DataBuffer ->
                        try {
                            ByteArrayOutputStream().use { byteArrayOutputStream ->
                                Channels.newChannel(byteArrayOutputStream)
                                    .write(dataBuffer.asByteBuffer().asReadOnlyBuffer())
                                requestBody =
                                    IOUtils.toString(
                                        byteArrayOutputStream.toByteArray(),
                                        "UTF-8"
                                    )
                                log.info(
                                    "Logging Request Filter: {} {}",
                                    httpUrl,
                                    requestBody
                                )
                            }
                        } catch (e: IOException) {
                            log.error(
                                "Logging Request Filter Error: {} {}",
                                httpUrl,
                                requestBody,
                                e
                            )
                        }
                    }
                }
            }

        val loggingServerHttpResponseDecorator: ServerHttpResponseDecorator =
            object : ServerHttpResponseDecorator(exchange.response) {
                var responseBody = ""
                override fun writeWith(body: Publisher<out DataBuffer>): Mono<Void> {
                    val buffer: Mono<DataBuffer> = Mono.from(body)
                    return super.writeWith(
                        buffer.doOnNext { dataBuffer: DataBuffer ->
                            try {
                                ByteArrayOutputStream().use { byteArrayOutputStream ->
                                    Channels.newChannel(byteArrayOutputStream)
                                        .write(
                                            dataBuffer
                                                .asByteBuffer()
                                                .asReadOnlyBuffer()
                                        )
                                    responseBody = IOUtils.toString(
                                        byteArrayOutputStream.toByteArray(),
                                        "UTF-8"
                                    )
                                    log.info(
                                        "Logging Response Filter: {} {}",
                                        httpUrl,
                                        responseBody
                                    )
                                }
                            } catch (e: Exception) {
                                log.error(
                                    "Logging Response Filter Error: {} {}",
                                    httpUrl,
                                    responseBody,
                                    e
                                )
                            }
                        }
                    )
                }
            }
        return chain.filter(
            exchange.mutate().request(loggingServerHttpRequestDecorator)
                .response(loggingServerHttpResponseDecorator)
                .build()
        )
    }
}

请注意,仅通过覆盖 getBody() 方法的方法仅适用于访问正文的控制器(请参见 @RequestBody)。 - OSGI Java

1
假设我们正在处理一个简单的 JSON 或 XML 响应,如果相应记录器的调试级别不足以某种原因,可以在将其转换为对象之前使用字符串表示形式:
Mono<Response> mono = WebClient.create()
                               .post()
                               .body(Mono.just(request), Request.class)
                               .retrieve()
                               .bodyToMono(String.class)
                               .doOnNext(this::sideEffectWithResponseAsString)
                               .map(this::transformToResponse);

以下是副作用和转换方法:
private void sideEffectWithResponseAsString(String response) { ... }
private Response transformToResponse(String response) { /*use Jackson or JAXB*/ }    

网页内容由stack overflow 提供, 点击上面的
可以查看英文原文,
原文链接