在Spring WebFlux的WebClient中设置超时时间

14

我正在使用Spring Webflux WebClient在我的Spring boot应用程序中进行REST调用,但每次都会在30秒内超时。

这是我尝试设置Spring webfulx WebClient套接字超时的一些代码。

 - ReactorClientHttpConnector connector = new ReactorClientHttpConnector(options -> options
           .option(ChannelOption.SO_TIMEOUT, 600000).option(ChannelOption.CONNECT_TIMEOUT_MILLIS, 600000));
 - ReactorClientHttpConnector connector = new ReactorClientHttpConnector(
           options -> options.afterChannelInit(chan -> {
                chan.pipeline().addLast(new ReadTimeoutHandler(600000));
            }));
 - ReactorClientHttpConnector connector1 = new ReactorClientHttpConnector(options -> options
            .option(ChannelOption.CONNECT_TIMEOUT_MILLIS, 600000).afterNettyContextInit(ctx -> {
                ctx.addHandlerLast(new ReadTimeoutHandler(600000, TimeUnit.MILLISECONDS));
            }));

尝试使用“clientConnector”方法在“WebClient”中添加上述连接器设置。

还尝试设置超时如下:

webClient.get().uri(builder -> builder.path("/result/{name}/sets")
                    .queryParam("q", "kind:RECORDS")
                    .queryParam("offset", offset)
                    .queryParam("limit", RECORD_COUNT_LIMIT)
                    .build(name))
            .header(HttpHeaders.AUTHORIZATION, accessToken)
            .exchange().timeout(Duration.ofMillis(600000))
            .flatMap(response -> handleResponse(response, name, offset));

以上选项均不适用于我。

我正在使用org.springframework.boot:spring-boot-gradle-plugin:2.0.0.M7,它内部依赖于org.springframework:spring-webflux:5.0.2.RELEASE。

请在此处提出建议,并让我知道是否有任何错误之处。


已经尝试过 spring-5-webflux-how-to-set-a-timeout-on-webclient - Abhishek Pawnikar
为什么你需要一个10分钟的超时时间? - ESala
这只是为了测试目的。如果我能够设置超时,那么我将根据我的应用程序对此时间进行基准测试。 - Abhishek Pawnikar
4个回答

20

我已经尝试重现该问题,但未成功。使用reactor-netty 0.7.5.RELEASE。

我不确定您所说的超时是哪个。

连接超时可以使用ChannelOption.CONNECT_TIMEOUT_MILLIS进行配置。在“连接”日志消息和实际错误之间我得到了10秒钟的时间:

WebClient webClient = WebClient.builder()
    .clientConnector(new ReactorClientHttpConnector(options -> options
        .option(ChannelOption.CONNECT_TIMEOUT_MILLIS, 10000)))
    .build();

webClient.get().uri("http://10.0.0.1/resource").exchange()
    .doOnSubscribe(subscription -> logger.info("connecting"))
    .then()
    .doOnError(err -> logger.severe(err.getMessage()))
    .block();

如果您在谈论读/写超时,则可以查看Netty的ReadTimeoutHandlerWriteTimeoutHandler

一个完整的示例可能如下所示:

ReactorClientHttpConnector connector = new ReactorClientHttpConnector(options ->
        options.option(ChannelOption.CONNECT_TIMEOUT_MILLIS, 10)
                .onChannelInit(channel -> {
                        channel.pipeline().addLast(new ReadTimeoutHandler(10))
                                .addLast(new WriteTimeoutHandler(10));
                return true;
        }).build());

从 Reactor Netty 0.8 和 Spring Framework 5.1 开始,配置现在看起来像这样:

TcpClient tcpClient = TcpClient.create()
                 .option(ChannelOption.CONNECT_TIMEOUT_MILLIS, 1000)
                 .doOnConnected(connection ->
                         connection.addHandlerLast(new ReadTimeoutHandler(10))
                                   .addHandlerLast(new WriteTimeoutHandler(10)));
WebClient webClient = WebClient.builder()
    .clientConnector(new ReactorClientHttpConnector(HttpClient.from(tcpClient)))
    .build();

也许将以下内容添加到您的application.properties文件中,可以提供更多关于HTTP层面上正在发生的情况的信息:

可能将以下内容添加到您的application.properties文件中,可以提供有关HTTP层面上正在发生的情况的更多信息:

logging.level.reactor.ipc.netty.channel.ContextHandler=debug
logging.level.reactor.ipc.netty.http.client.HttpClient=debug

requestTimeout确实等于ChannelOption.SO_TIMEOUT。connectTimeoutMillis绝对是关于连接的。 - Abhishek Pawnikar
@BrianClozel 感谢您分享适用于 Reactor Netty 0.8 和 Spring Framework 5.1 的更新选项,以设置读取超时和连接超时。 - samaitra
@BrianClozel 由于这个解决方案已经被弃用,实现它的正确方式是什么?我在 HttpClient 上看到了一个公共的 final 方法 HttpClient responseTimeout(Duration timeout),但不确定是否足够(最多可能只是 readtimeout,但如何指定 connecttimeout 呢?)。谢谢。 - simpleusr
@im_bhatman 由于此解决方案已被弃用,实现它的正确方法是什么?我在HttpClient上看到了一个公共的final HttpClient responseTimeout(Duration timeout)方法,但不确定这是否足够(最好的情况下,我猜这是readtimeout,但如何指定connecttimeout呢?)?谢谢。 - simpleusr
@simpleusr,您可以使用简单的HttpClient.create(),并直接在其上使用.responseTimeout()进行配置。这将适用于旧的和新的netty实现。(实现在答案中单独给出)。 - im_bhatman
显示剩余6条评论

7
自从最新版本的netty(v0.9.x)中不再支持HttpClient.from(tcpClient),并且将在v1.1.0中删除该方法。您可以使用responseTimeout()替代,并忽略其他代码中看到的太多HTTP连接配置,这个实现可以同时适用于旧版和新版。
创建HttpClient。
HttpClient httpClient = HttpClient.create().responseTimeout(Duration.ofMillis(500)); // 500 -> timeout in millis

使用webClient builder函数.clientConnector()将httpClient添加到webclient中。
WebClient
.builder()
.baseUrl("http://myawesomeurl.com")
.clientConnector(new ReactorClientHttpConnector(httpClient))
.build();

另外,网站上可用的实现大多数都未被弃用。为了确保您不会使用已弃用的实现,您可以查看此链接

FYI:对于旧版本的Netty(即< v0.9.11版本),responseTimeout()在底层使用tcpConfiguration(),而后者在新版本中已弃用。但是,在>=v0.9.11中,responseTimeout()使用了新的实现,因此即使将来更改项目中的Netty版本,您的代码也不会出错。

注意:如果您正在使用默认带有Spring的旧版本Netty,则可能也可以使用Brian的实现。(不确定)

如果您想了解更多关于responseTimeout()及其工作原理的信息,可以在这里这里或在Github gist这里查看源代码。


1
我相信这是从v0.9.11开始提供的,而不是所有的v0.9.x版本都有。我不得不升级我的Spring版本,它使用的是v0.9.4。 - KaiserCoaster
是的,responseTimeout 可在 0.9.11 版本后使用。感谢 @KaiserCoaster 指出这一点。 - im_bhatman

1
根据@im_bhatman提到的,HttpClient.from(TcpClient)方法已被弃用,这里介绍一种使用传统方式的方法。
// works for Spring Boot 2.4.0, 2.4.1, and 2.4.2
// doesn't work for Spring Boot 2.3.6, 2.3.7, and 2.3.8
HttpClient httpClient = HttpClient.create()
        //.responseTimeout(Duration.ofSeconds(READ_TIMEOUT_SECONDS))
        .option(ChannelOption.CONNECT_TIMEOUT_MILLIS, CONNECT_TIME_MILLIS)
        .doOnConnected(c -> {
                c.addHandlerLast(new ReadTimeoutHandler(READ_TIMEOUT_SECONDS))
                 .addHandlerLast(new WriteTimeoutHandler(WRITE_TIMEOUT_SECONDS));
        });
ClientConnector clientConnector = new ReactorClientHttpConnector(httpClient);
WebClient webClient = WebClient.builder()
        .clientConnector(clientConnector)
        ...
        .build();

我不确定以下两者的区别:

  • HttpClient#responseTimeout(...)
  • HttpClient#doOnConnected(c -> c.addHandler(new ReadTimeoutHandler(...)))

其中第一个是设置HTTP响应超时时间,而第二个是在建立连接后添加一个读取超时处理器。


0
请参考下面的代码块,使用WebClient设置超时和重试。
.retrieve()
            .onStatus(
                   (HttpStatus::isError), // or the code that you want
                    (it -> handleError(it.statusCode().getReasonPhrase())) //handling error request
           )
            .bodyToMono(String.class)
            .timeout(Duration.ofSeconds(5))
            .retryWhen(
                    Retry.backoff(retryCount, Duration.ofSeconds(5))
                            .filter(throwable -> throwable instanceof TimeoutException)
           )

网页内容由stack overflow 提供, 点击上面的
可以查看英文原文,
原文链接