如何测量Webflux WebClient方法的执行时间?

3
我准备了一堆请求,想要并行发送到一个外部Web服务。在这个过程中,我会继续直接处理响应(例如将某些内容插入数据库)。
问题:我想追踪最大的请求时间(针对一个请求!),但不包括处理时间。但是按照目前的写法,这只会跟踪包括任何子进程在内的全局时间。
StopWatch watch = new StopWatch();
watch.start();

Flux.fromIterable(requests)
    .flatMap(req -> webClient.send(req, MyResponse.class)
            .doOnSuccess(rsp -> processResponse(rsp))) //assume some longer routine
    .collectList()
    .block();
    
watch.stop();
System.out.println(w.getTotalTimeMillis());
            

问题:如何测量请求所花费的最长时间,但不包括 processResponse() 的时间?

1
你可以在内部发布者上使用定时操作符:https://projectreactor.io/docs/core/release/api/reactor/core/publisher/Mono.html#timed-- - Martin Tarjányi
你是在使用Spring Boot还是仅仅使用Spring框架? - Michael McFadyen
@MichaelMcFadyen 我也在使用 spring-boot - membersound
@MartinTarjányi 请给我一个使用 timed()elapsed() 的例子。我在网上搜索了一下,但没有找到相关的例子... - membersound
计时和经过的工作时间,除非有错误; 当出现错误时,我该如何使用elapsed或timed? - xpmatteo
3个回答

6

在使用 elapsed 方法时,你将得到一个包含已消耗时间和原始对象的元组的单值。你需要解除包装才能使用它们。我写了一个示例(从你的代码中稍作简化)来测试它是否正常工作:

@Test
public void elapsed() {

    Flux.fromIterable(List.of(1, 2, 3, 4, 5))
        .flatMap(req -> Mono.delay(Duration.ofMillis(100L * req))
                            .map(it -> "response_" + req)
                            .elapsed()
                            .doOnNext(it -> System.out.println("I took " + it.getT1() + " MS"))
                            .map(Tuple2::getT2)
                            .doOnSuccess(rsp -> processResponse(rsp)))
        .collectList()
        .block();

}

@SneakyThrows
public void processResponse(Object it) {
    System.out.println("This is the response: " + it);
    Thread.sleep(1000);
}

输出结果如下:
I took 112 MS
This is the response: response_1
I took 205 MS
This is the response: response_2
I took 305 MS
This is the response: response_3
I took 403 MS
This is the response: response_4
I took 504 MS
This is the response: response_5

这些数字代表延迟(在您的情况下应该是webClient.send())和响应式流水线本身的一些开销。它是在订阅时计算的(当特定请求的flatMap运行时发生),并在下一个信号(在我的示例中为map的结果,在您的示例中为webclient请求的结果)之间进行计算。

您的代码将如下所示:

Flux.fromIterable(requests)
        .flatMap(req -> webClient.send(req, MyResponse.class)
                                 .elapsed()
                                 .doOnNext(it -> System.out.println("I took " + it.getT1() + " MS"))
                                 .map(Tuple2::getT2)
                                 .doOnSuccess(rsp -> processResponse(rsp))) //assume some longer routine
        .collectList()
        .block();

请注意,如果您想使用秒表代替计时器,在代码中进行如下更改:

Flux.fromIterable(List.of(1, 2, 3, 4, 5)).flatMap(req -> {
            StopWatch stopWatch = new StopWatch();
            return Mono.fromRunnable(stopWatch::start)
                       .then(Mono.delay(Duration.ofMillis(100L * req)).map(it -> "response_" + req).doOnNext(it -> {
                           stopWatch.stop();
                           System.out.println("I took " + stopWatch.getTime() + " MS");
                       }).doOnSuccess(this::processResponse));
        }).collectList().block();

但是我个人建议使用.elapsed()方法来解决,因为它更加简洁。


使用 timed()elapsed() 有什么区别吗?我注意到两者通常都可以工作。 - membersound
区别在于你如何获取返回值,可以使用元组或包装对象。两者确实以类似的方式工作。 - p.streef

0

我建议在该方法中避免使用秒表。相反,创建一个度量包装器,可以在其他地方使用。

您可以利用.doOnSubscribe()、.doOnError()、.doOnSuccess() 但是为了回答您的问题,您可以像这样使用计时器


    public sendRequest(){
                Flux.fromIterable(requests)
                .flatMap(req -> webClient.send(req, MyResponse.class)
                        .transform(timerPublisher("time took for ", req.id)))
                .collectList()
                .block();
   }

//this can be made sophisticated by determining what kind of publisher it is
//mono or flux
    private Function<Mono<T>, Publisher<T>> timerPublisher(String metric) {

        StopWatchHelper stopWatch = new StopWatchHelper(metric);
        return s -> s.doOnSubscribe((s) -> stopWatch.start())
                .doOnSuccess(documentRequest -> stopWatch.record())
                .doOnError(stopWatch::record);
    }

    private class StopWatchHelper{
        private StopWatch stopWatch;
        private String metric;
        public StopWatchHelper(String metric){
            this.metric = metric;
            stopWatch = new StopWatch();
        }
        public Consumer<Subscription> start() {
            return (s) -> stopWatch.start();
        }

        public void record(){
            if(stopWatch.isStarted()){
                System.out.println(String.format("Metric %s took %s", metric, stopWatch.getTime()));
            }
        }

        public void record(Throwable t){
            if(stopWatch.isStarted()){
                System.out.println(String.format("Metric %s took %s, reported in error %s", metric, stopWatch.getTime(),throwable));
            }
        }
    }


PS: Avoid using .block() -> it beats the purpose :) 

-2

Spring Boot 提供了一个开箱即用的功能,可以为您的 WebClient 添加仪表板。

您可以通过使用自动配置的 WebClient.Builder 来创建您的 WebClient 实例来“启用”这些指标。

@Bean
public WebClient myCustomWebClient(WebClient.Builder builder) {
    return builder
            // your custom web client config code
            .build();
}

这个仪器将会计时你的每个WebClient API调用,并在你配置的MeterRegistry中注册它。

参考文档


我该如何访问在一个进程中发出的请求,并提取“最长”的时间? - membersound
此儀表將默認公開3個指標——計數、總計和最大值。最大值將告訴您最慢的API調用的持續時間。 - Michael McFadyen

网页内容由stack overflow 提供, 点击上面的
可以查看英文原文,
原文链接