非阻塞Java异步处理 - 如何限制内存使用?

3
我离开 Java 已经有几年了,最近很兴奋地看到新的 java.net.http.HttpClientAWS Java SDK 2.0 支持非阻塞异步操作。几年前,我在会议演讲中听说过反应式编程的概念,但实践机会不多。
我有一个问题,似乎很适合尝试这种编程风格:基本上,我想通过 HTTP 下载大量文件(比如 10,000 个)并将它们写回到 S3。
我已经使用 failsafe 实现了针对非阻塞异步 http GET 的重试,并且可以轻松地通过 S3 异步客户端将其与上传组合起来(请参见下面的草图)。
然而,我不确定如何正确限制程序的内存使用:如果文件下载速度比写回到 S3 的速度快,就没有机制施加背压并防止内存不足异常。
我熟悉一些传统的阻塞解决方案来解决这个问题 - 例如使用信号量来限制并发下载数量,或将下载内容写入某个有界阻塞队列,S3上传线程将从中拉取。然而,如果我要使用这样的阻塞机制来应用反压力,则会让我质疑首先使用非阻塞IO的优势。
是否有更具代表性的“响应式”方法来实现相同的目标?
import net.jodah.failsafe.Failsafe;
import net.jodah.failsafe.RetryPolicy;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import software.amazon.awssdk.core.async.AsyncRequestBody;
import software.amazon.awssdk.services.s3.S3AsyncClient;
import software.amazon.awssdk.services.s3.model.PutObjectRequest;
import software.amazon.awssdk.services.s3.model.PutObjectResponse;

import java.net.URI;
import java.net.http.HttpClient;
import java.net.http.HttpRequest;
import java.net.http.HttpResponse;
import java.time.Duration;
import java.util.ArrayList;
import java.util.Arrays;
import java.util.List;
import java.util.concurrent.CompletableFuture;
import java.util.stream.Collectors;

public class BackupClient {
    private static final Logger LOGGER = LoggerFactory.getLogger(BackupClient.class);
    private final HttpClient httpClient = HttpClient.newBuilder().build();
    private final S3AsyncClient s3AsyncClient = S3AsyncClient.create();

    public runBackup(List<URI> filesToBackup) {
        List<CompletableFuture<PutObjectResponse>> futures = filesToBackup.stream()
                .map(backupClient::submitBackup)
                .collect(Collectors.toList());

        futures.forEach(CompletableFuture::join);
    }

    private CompletableFuture<PutObjectResponse> submitBackup(URI uri) {
        return sendAsyncWithRetries(uri, HttpResponse.BodyHandlers.ofString())
                .thenCompose(httpResponse -> s3AsyncClient.putObject(PutObjectRequest.builder()
                        .bucket("my-bucket")
                        .key(uri.toASCIIString())
                        .build(), AsyncRequestBody.fromString(httpResponse.body())));
    }


    private <T> CompletableFuture<HttpResponse<T>> sendAsyncWithRetries(URI uri, HttpResponse.BodyHandler<T> handler) {
        final HttpRequest request = HttpRequest.newBuilder()
                .uri(uri)
                .timeout(Duration.ofMinutes(2))
                .GET()
                .build();

        final var retryPolicy = new RetryPolicy<HttpResponse<T>>()
                .withMaxRetries(4)
                .withDelay(Duration.ofSeconds(1))
                .handleResultIf(response -> 200 != response.statusCode());

        return Failsafe.with(retryPolicy)
                .getStageAsync(context -> {
                    if (context.getAttemptCount() > 0) {
                        LOGGER.error("Retry " + context.getAttemptCount() + " for " + uri);
                    }
                    return this.httpClient.sendAsync(request, handler);
                });
    }
}
1个回答

3

如果您需要控制资源(内存)消耗,那么 Semaphore 是实现此目标的正确工具。而且,如果您想要使用非阻塞计算,那么您所需要的就是异步 Semaphore。流行的库(rxjava、reactive streams)在内部使用异步 Semaphore 构建反应式流,但不提供它作为单独的类。当反应式流的订阅者调用 Flow.Subscription.request(n) 时,相当于调用 Semaphore.release(n)。然而,Semaphore.acquire() 的类比被隐藏了起来,由发布者在内部调用。

这种设计方案的缺点是,资源反馈只能在生产者和最近的消费者之间建立。如果有一系列的生产者和消费者,则必须分别控制每个链接的资源消耗,并且整体资源消耗会变成 N 倍,其中 N 是链接数。

如果您能承担这个成本,那么您可以使用rxjava或任何其他反应流库的实现。如果不能,那么您必须使用唯一的异步库,该库允许用户完全访问异步Semaphore实现DF4J(是的,我是作者)。它不包含直接解决您问题的方案,但具有一个示例,其中异步网络服务器通过异步信号量限制了同时连接的数量,请参见ConnectionManager.java

网页内容由stack overflow 提供, 点击上面的
可以查看英文原文,
原文链接