如何使用Mono和Flux限制并发的HTTP请求

4

我想处理Flux,以限制由Mono列表发出的并发HTTP请求数量。

当一些请求完成(收到响应)后,服务再发送一个请求,直到等待请求的总数为15。

单个请求返回一个列表,并根据结果触发另一个请求。

现在,我想限制并发地发送请求。因为消费方,太多的HTTP请求会让服务器陷入麻烦。

我使用以下方式的flatMapMany

public Flux<JsonNode> syncData() {
    return service1
        .getData(param1)
        .flatMapMany(res -> {
                List<Mono<JsonNode>> totalTask = new ArrayList<>();
                Map<String, Object> originData = service2.getDataFromDB(param2);
                res.withArray("data").forEach(row -> {
                       String id = row.get("id").asText();
                       if (originData.containsKey(id)) {
                           totalTask.add(service1.updateRequest(param3));
                       } else {
                            totalTask.add(service1.deleteRequest(param4));
                       }
                       originData.remove(id);
                });
                for (left) {
                    totalTask.add(service1.createRequest(param5));
                }
                return Flux.merge(totalTask);
        });
}

void syncData() {
    syncDataService.syncData().????;
}

我试过链式使用 .window(15),但它并不起作用。所有请求都同时发送。

为了实现我的目标,我该如何处理 Flux

3个回答

3
我担心Project Reactor没有提供速率或时间限制的任何实现。
但是,您可以找到一堆第三方库,它们提供此类功能并与Project Reactor兼容。据我所知,resilience4-reactor支持这一点,并且还与Spring和Spring Boot框架兼容。

RateLimiterOperator 检查下游订阅者/观察者是否可以获取订阅上游Publisher的权限。如果超过速率限制,则 RateLimiterOperator 可以延迟从上游请求数据,或者向下游订阅者发出 RequestNotPermitted 错误。

RateLimiter rateLimiter = RateLimiter.ofDefaults("name");
Mono.fromCallable(backendService::doSomething)
    .transformDeferred(RateLimiterOperator.of(rateLimiter))

关于RateLimiter模块的更多信息,请参见此处:https://resilience4j.readme.io/docs/ratelimiter


1

1

网页内容由stack overflow 提供, 点击上面的
可以查看英文原文,
原文链接