向WebClient添加重试所有请求功能

24

我们有一个服务器用于获取OAUTH令牌,而且通过WebClient.filter方法将OAUTH令牌添加到每个请求中。

webClient
                .mutate()
                .filter((request, next) -> tokenProvider.getBearerToken()
                        .map(token -> ClientRequest.from(request)
                                .headers(httpHeaders -> httpHeaders.set("Bearer", token))
                                .build()).flatMap(next::exchange))
                .build();
TokenProvider.getBearerToken returns Mono<String> since it is a webclient request (this is cached)

我希望有一个重试功能,在401错误时,会使token失效并再次尝试请求。我的实现方式如下:

webClient.post()
            .uri(properties.getServiceRequestUrl())
            .contentType(MediaType.APPLICATION_JSON)
            .body(fromObject(createRequest))
            .retrieve()
            .bodyToMono(MyResponseObject.class)
            .retryWhen(retryOnceOn401(provider))

private Retry<Object> retryOnceOn401(TokenProvider tokenProvider) {
        return Retry.onlyIf(context -> context.exception() instanceof WebClientResponseException && ((WebClientResponseException) context.exception()).getStatusCode() == HttpStatus.UNAUTHORIZED)
                .doOnRetry(objectRetryContext -> tokenProvider.invalidate());
    }

有没有办法将这段代码移动到 webClient.mutate().....build() 函数中,以便所有请求都具备重试功能?

我尝试将其添加为筛选器,但似乎无效,例如:

.filter(((request, next) -> next.exchange(request).retryWhen(retryOnceOn401(tokenProvider))))

有什么最佳的方法建议来处理这个问题吗? 谢谢


请提供更多关于“它不起作用”的信息 - 您是否收到异常?您的重试函数是否被调用?令牌是否未失效?您能提供log()操作符的日志输出吗? - Brian Clozel
2
嗨,Brian,我想我弄清楚了。WebClient在401时不会抛出异常,因为只有在调用bodyToMono后才会抛出异常,因为这会检查ClientResponse的状态,并在出现错误时抛出WebClientResponseException。 因此,在构建器上,retryWhen实际上从未被调用,因为没有异常被抛出,我可以通过检查响应是否为401并在那时抛出异常,然后重试函数就会启动。 - Kevin Hussey
很高兴听到这个消息!请回答你的问题,我相信这会帮助其他人。 - Brian Clozel
3个回答

25
我弄清楚了,发现retry仅在异常情况下起作用,webClient不会抛出异常,因为clientResponse对象只保存响应,只有在调用bodyTo时才会根据http状态抛出异常,所以要解决这个问题,可以模仿这种行为。
@Bean(name = "retryWebClient")
    public WebClient retryWebClient(WebClient.Builder builder, TokenProvider tokenProvider) {
        return builder.baseUrl("http://localhost:8080")
                .filter((request, next) ->
                        next.exchange(request)
                            .doOnNext(clientResponse -> {
                                    if (clientResponse.statusCode() == HttpStatus.UNAUTHORIZED) {
                                        throw new RuntimeException();
                                    }
                            }).retryWhen(Retry.anyOf(RuntimeException.class)
                                .doOnRetry(objectRetryContext -> tokenProvider.expire())
                                .retryOnce())

                ).build();
    }

编辑具有重复/重试功能的一个特点是,它不会更改原始请求,在我的情况下,我需要检索新的OAuth令牌,但上面发送了相同的(过期)令牌。 我已经想出了一种使用交换过滤器进行此操作的方法,一旦OAuth密码流在spring-security-2.0中,我应该能够将其与AccessTokens等集成,但在此期间。

ExchangeFilterFunction retryOn401Function(TokenProvider tokenProvider) {
        return (request, next) -> next.exchange(request)
                .flatMap((Function<ClientResponse, Mono<ClientResponse>>) clientResponse -> {
                    if (clientResponse.statusCode().value() == 401) {
                        ClientRequest retryRequest = ClientRequest.from(request).header("Authorization", "Bearer " + tokenProvider.getNewToken().toString()).build();
                        return next.exchange(retryRequest);
                    } else {
                        return Mono.just(clientResponse);
                    }
                });
    }

小心使用retry和filter,因为可能会创建一个无限循环。 - Saljack
@Saljack 在测试这个解决方案时,我没有看到任何无限循环发生的情况。你有具体的场景可以出现吗? - M. Justin

4

我完全可以通过 ExchangeFilterFunction 来实现这一点,而无需抛出异常或进行类似的操作。

最初让我困惑的是期望响应(Mono、Flux 等)与从结果 WebClient 调用中获得的响应相同。当您使用 WebClient 时,如果收到未经授权的 Mono,则会出现“错误”,您可以通过像 onErrorResume 这样的东西来处理它。然而,在 ExchangeFilterFunction 中,如果您调用 next.exchange(ClientRequest),则返回的 Mono 只是一个类型为 ClientResponse 的普通成功值,即使返回未经授权。

因此,您可以使用以下代码来处理它(其中令牌服务被替换为您特定的令牌处理代码):

public class OneRetryAuthExchangeFilterFunction implements ExchangeFilterFunction {

    private final ITokenService tokenService;

    @Override
    public Mono<ClientResponse> filter(ClientRequest request, ExchangeFunction next) {
        ClientRequest authenticatedRequest = applyAuthentication(request);

        return next.exchange(authenticatedRequest)
                .flatMap(response -> {
                    if (HttpStatus.UNAUTHORIZED.equals(response.statusCode())) {
                        tokenService.forceRefreshToken();

                        ClientRequest refreshedAuthenticatedTokenRequest = applyAuthentication(request);

                        return next.exchange(refreshedAuthenticatedTokenRequest);
                    }

                    return Mono.just(response);
                });
    }

    private ClientRequest applyAuthentication(ClientRequest request) {
        String authenticationToken = tokenService.getToken();

        return ClientRequest.from(request)
                .headers(headers -> headers.setBearerAuth(authenticationToken))
                .build();
    }
}

您可以通过以下方式配置WebClient

WebClient.builder()
        .filter(new OneRetryAuthExchangeFilterFunction(tokenService))
        .build();

并且所有使用该WebClient的用户都将在未授权响应时进行单次重试身份验证


4

通用需求的一般方法:

@Configuration
public class WebConfiguration {

@Bean
@Primary
public WebClient webClient(ObjectMapper mapper) {

WebClient httpClient =
    WebClient.builder()
        .filter(retryFilter())
        .build();

  return httpClient;
}

private ExchangeFilterFunction retryFilter() {
return (request, next) ->
    next.exchange(request)
        .retryWhen(
            Retry.fixedDelay(3, Duration.ofSeconds(30))
              .doAfterRetry(retrySignal -> log.warn("Retrying"));
}

网页内容由stack overflow 提供, 点击上面的
可以查看英文原文,
原文链接