Java 11 HttpClient Http2 太多的流错误

12

我正在使用Java 11的HttpClient向HTTP2服务器发送请求。如下所示,HttpClient对象是作为单例Spring bean创建的。

@Bean
    public HttpClient getClient() {
                return HttpClient.newBuilder().version(Version.HTTP_2).executor(Executors.newFixedThreadPool(20)).followRedirects(Redirect.NORMAL)
                .connectTimeout(Duration.ofSeconds(20)).build();
    }

我正在使用sendAsync方法异步发送请求。

当我尝试连续向服务器发送请求时,一段时间后会收到错误信息“java.io.IOException: too many concurrent streams”。我在客户端构建中使用了Fixed threadpool来尝试解决这个问题,但仍然会出现相同的错误。

异常堆栈如下:

java.util.concurrent.CompletionException: java.io.IOException: too many concurrent streams
    at java.base/java.util.concurrent.CompletableFuture.encodeRelay(CompletableFuture.java:367) ~[?:?]
    at java.base/java.util.concurrent.CompletableFuture.uniComposeStage(CompletableFuture.java:1108) ~[?:?]
    at java.base/java.util.concurrent.CompletableFuture.thenCompose(CompletableFuture.java:2235) ~[?:?]
    at java.net.http/jdk.internal.net.http.MultiExchange.responseAsyncImpl(MultiExchange.java:345) ~[java.net.http:?]
    at java.net.http/jdk.internal.net.http.MultiExchange.lambda$responseAsync0$2(MultiExchange.java:250) ~[java.net.http:?]
    at java.base/java.util.concurrent.CompletableFuture$UniCompose.tryFire(CompletableFuture.java:1072) ~[?:?]
    at java.base/java.util.concurrent.CompletableFuture.postComplete(CompletableFuture.java:506) ~[?:?]
    at java.base/java.util.concurrent.CompletableFuture$AsyncSupply.run(CompletableFuture.java:1705) ~[?:?]
    at java.base/java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1128) [?:?]
    at java.base/java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:628) [?:?]
    at java.base/java.lang.Thread.run(Thread.java:834) [?:?]
Caused by: java.io.IOException: too many concurrent streams
    at java.net.http/jdk.internal.net.http.Http2Connection.reserveStream(Http2Connection.java:440) ~[java.net.http:?]
    at java.net.http/jdk.internal.net.http.Http2ClientImpl.getConnectionFor(Http2ClientImpl.java:103) ~[java.net.http:?]
    at java.net.http/jdk.internal.net.http.ExchangeImpl.get(ExchangeImpl.java:88) ~[java.net.http:?]
    at java.net.http/jdk.internal.net.http.Exchange.establishExchange(Exchange.java:293) ~[java.net.http:?]
    at java.net.http/jdk.internal.net.http.Exchange.responseAsyncImpl0(Exchange.java:425) ~[java.net.http:?]
    at java.net.http/jdk.internal.net.http.Exchange.responseAsyncImpl(Exchange.java:330) ~[java.net.http:?]
    at java.net.http/jdk.internal.net.http.Exchange.responseAsync(Exchange.java:322) ~[java.net.http:?]
    at java.net.http/jdk.internal.net.http.MultiExchange.responseAsyncImpl(MultiExchange.java:304) ~[java.net.http:?]

有人可以帮助我解决这个问题吗?

服务器是Tomcat9,同时处理的最大流是默认值。

3个回答

8
当我尝试连续访问服务器时,服务器会有一个名为“max_concurrent_streams”的设置,该设置在HTTP/2连接初始建立期间与客户端通信。
如果你使用sendAsync不加限制地“连续访问服务器”,那么你就没有等待之前的请求完成,最终会超过max_concurrent_streams值并收到上述错误。
解决方案是同时发送少于max_concurrent_streams数量的请求;之后,只有在前一个请求完成后才会发送新的请求。这可以在客户端使用Semaphore或类似的东西轻松实现。

谢谢您的澄清,我现在明白了问题。我将使用一些屏障来限制请求。为了进一步理解,当我使用FixedThreadPool而不是默认的CachedThreadPool时,它会限制连接数,而每个连接都受到在服务器或客户端端配置的最大流数量的限制。我的理解正确吗? - mnpr
线程池不限制连接数。使用HTTP/2,可能只有一个与服务器的连接。您可能会限制并发请求的数量,但我不会依赖它,因为它高度依赖于实现。如果您升级Java版本,则固定线程池可能不会像以前的Java版本那样限制并发请求。我建议明确使用Semaphore,就像我在答案中解释的那样,这样它将适用于任何Java版本。 - sbordet

5

很遗憾,@sbordet 提出的使用 Semaphore 的方法对我不起作用。我尝试了以下代码:

var semaphore = semaphores.computeIfAbsent(getRequestKey(request), k -> new Semaphore(MAX_CONCURRENT_REQUESTS_NUMBER));

CompletableFuture.runAsync(semaphore::acquireUninterruptibly, WAITING_POOL)
                .thenComposeAsync(ignored -> httpClient.sendAsync(request, responseBodyHandler), ASYNC_POOL)
                .whenComplete((response, e) -> semaphore.release());

不能保证连接流在执行传递到下一个CompletableFuture时已被释放,而信号量被释放。对于正常执行,我的方法有效,但如果发生任何异常,则似乎在调用semaphore.release()之后连接流可能会被关闭。

最后,我最终使用了OkHttp。它处理了这个问题(如果并发流的数量达到max_concurrent_streams,它只会等待一些流空闲)。它还处理了GOAWAY帧。对于Java HttpClient,我不得不实现重试逻辑来处理这个问题,因为它只会抛出IOException,如果服务器发送GOAWAY帧。


你能解释一下这个例子中 getRequestKey(request) 是如何实现的吗? - Scot
2
@Scot 我刚刚以与Java中实现的类似方式实现了它。请参考 jdk.internal.net.http.Http2Connection#keyString 了解详细信息。 - Roman Kishchenko
这是否意味着单个主机:端口组合将有一个信号量(无论有多少连接)?我对单个主机:端口具有多个连接的情况感兴趣,我不确定是否有一种方法可以为每个唯一的连接实例创建窗口/信号量。也许客户端概念中内置了一种反应方式,但我还没有想出来。 - Scot
2
@Scot 这个问题是关于HTTP 2多路复用和Java客户端的。我之前研究过这个问题,所以不能确定,但是我记得如果主机支持HTTP 2多路复用,Java HTTP客户端只使用一个连接。因此,在这种情况下,每个scheme:host:port使用一个信号量是相关的。 - Roman Kishchenko

2
我认为@sbordet的回答是不正确的,这个错误并不是因为你的每秒请求数超过了MAX_CONCURRENT_STREAMS,而是因为打开的HTTP流的数量(每个HTTP 2连接?)超过了该数字。
例如,我在工作中有一台服务器,它的MAX_CONCURRENT_STREAMS设置为128:
$ curl -iv -H "Content-Type: application/json" https://example.local

...
* Connection state changed (MAX_CONCURRENT_STREAMS == 128)!

但是我似乎可以每秒发起多达1000个请求,而不会收到任何错误信息:
import java.net.http.HttpClient;
import java.net.http.HttpRequest;
import java.net.http.HttpResponse;
import java.net.http.HttpResponse.BodyHandlers;
import java.util.ArrayList;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import java.util.concurrent.atomic.AtomicInteger;
​
public class TooManyConcurrentStreams1 {
​
    private static final int CONCURRENCY = 1000;
​
    public static void main(String[] args) {
        final var counter = new AtomicInteger();
        final var singletonHttpClient = newHttpClient();
        final var singletonRequest = newRequest();
        final var responses = new ArrayList<CompletableFuture<HttpResponse<Void>>>(CONCURRENCY);
​
        for (int i = 0; i < CONCURRENCY; i++) {
            responses.add(singletonHttpClient.sendAsync(singletonRequest, BodyHandlers.discarding()));
        }
​
        for (CompletableFuture<HttpResponse<Void>> response : responses) {
            response.thenAccept(x -> {});
            response.join();
            System.out.println(counter.incrementAndGet());
        }
​
        singletonHttpClient.executor().ifPresent(executor -> {
            if (executor instanceof ExecutorService executorService) {
                executorService.shutdown();
            }
        });
    }
​
    public static HttpRequest newRequest() {
        return HttpRequest.newBuilder()
                .uri(Constants.TEST_URI)
                .header("Content-Type", Constants.CONTENT_TYPE)
                .header("Accept", Constants.CONTENT_TYPE)
                .POST(HttpRequest.BodyPublishers.ofString(Constants.BODY))
                .build();
    }
​
    public static HttpClient newHttpClient() {
        return HttpClient.newBuilder()
                .version(HttpClient.Version.HTTP_2)
                .executor(Executors.newFixedThreadPool(CONCURRENCY))
                .build();
    }
}

当我将CONCURRENCY增加到像2000这样荒谬的数字时,我会遇到这个错误,而不是java.io.IOException: too many concurrent streams
Exception in thread "main" java.util.concurrent.CompletionException: java.net.SocketException: Connection reset
    at java.base/java.util.concurrent.CompletableFuture.encodeRelay(CompletableFuture.java:368)
    at java.base/java.util.concurrent.CompletableFuture.completeRelay(CompletableFuture.java:377)
    at java.base/java.util.concurrent.CompletableFuture$UniCompose.tryFire(CompletableFuture.java:1152)
    at java.base/java.util.concurrent.CompletableFuture.postComplete(CompletableFuture.java:510)
    at java.base/java.util.concurrent.CompletableFuture.completeExceptionally(CompletableFuture.java:2162)
    at java.net.http/jdk.internal.net.http.Stream.completeResponseExceptionally(Stream.java:1153)
    at java.net.http/jdk.internal.net.http.Stream.cancelImpl(Stream.java:1238)
    at java.net.http/jdk.internal.net.http.Stream.connectionClosing(Stream.java:1212)
    at java.net.http/jdk.internal.net.http.Http2Connection.shutdown(Http2Connection.java:710)
    at java.net.http/jdk.internal.net.http.Http2Connection$Http2TubeSubscriber.processQueue(Http2Connection.java:1323)
    at java.net.http/jdk.internal.net.http.common.SequentialScheduler$LockingRestartableTask.run(SequentialScheduler.java:205)
    at java.net.http/jdk.internal.net.http.common.SequentialScheduler$CompleteRestartableTask.run(SequentialScheduler.java:149)
    at java.net.http/jdk.internal.net.http.common.SequentialScheduler$SchedulableTask.run(SequentialScheduler.java:230)
    at java.base/java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1136)
    at java.base/java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:635)
    at java.base/java.lang.Thread.run(Thread.java:833)

然而,我可以用这段代码复现你的错误(我先遇到了这个错误,然后在这里找到了你的问题!)
import java.net.http.HttpClient;
import java.net.http.HttpRequest;
import java.net.http.HttpResponse;
import java.net.http.HttpResponse.BodyHandlers;
import java.util.ArrayList;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.Executors;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicInteger;
​
public class TooManyConcurrentStreams2 {
​
    public static void main(String[] args) {
        final var singletonHttpClient = newHttpClient();
        final var singletonRequest = newRequest();
        final var counter = new AtomicInteger();
​
        final var scheduler = Executors.newScheduledThreadPool(2);
​
        scheduler.schedule(scheduler::shutdown, 1, TimeUnit.HOURS);
​
        scheduler.scheduleAtFixedRate(() -> {
            final var batchSize = counter.incrementAndGet();
            final var responses = new ArrayList<CompletableFuture<HttpResponse<Void>>>(batchSize);
​
            try {
                for (int i = 0; i < batchSize; i++) {
                    responses.add(
                            singletonHttpClient.sendAsync(
                                    singletonRequest,
                                    BodyHandlers.discarding()
                            )
                    );
                }
​
                for (CompletableFuture<HttpResponse<Void>> response : responses) {
                    response.thenAccept(x -> {
                    });
                    response.join();
                }
            } catch (Exception e) {
                e.printStackTrace();
            }
​
            System.out.println("batchSize = " + batchSize);
        }, 0, 500, TimeUnit.MILLISECONDS);
    }
​
​
    public static HttpRequest newRequest() {
        return HttpRequest.newBuilder()
                .uri(Constants.TEST_URI)
                .header("Content-Type", Constants.CONTENT_TYPE)
                .header("Accept", Constants.CONTENT_TYPE)
                .POST(HttpRequest.BodyPublishers.ofString(Constants.BODY))
                .build();
    }
​
    public static HttpClient newHttpClient() {
        return HttpClient.newBuilder()
                .version(HttpClient.Version.HTTP_2)
                .build();
    }
}

这个在我的每500毫秒执行一次的可运行程序中,在第128次失败了!
java.util.concurrent.CompletionException: java.io.IOException: too many concurrent streams
    at java.base/java.util.concurrent.CompletableFuture.encodeRelay(CompletableFuture.java:368)
    at java.base/java.util.concurrent.CompletableFuture.uniComposeStage(CompletableFuture.java:1189)
    at java.base/java.util.concurrent.CompletableFuture.thenCompose(CompletableFuture.java:2309)
    at java.net.http/jdk.internal.net.http.MultiExchange.responseAsyncImpl(MultiExchange.java:453)
    at java.net.http/jdk.internal.net.http.MultiExchange.lambda$responseAsync0$2(MultiExchange.java:341)
    at java.base/java.util.concurrent.CompletableFuture$UniCompose.tryFire(CompletableFuture.java:1150)
    at java.base/java.util.concurrent.CompletableFuture.postComplete(CompletableFuture.java:510)
    at java.base/java.util.concurrent.CompletableFuture$AsyncSupply.run(CompletableFuture.java:1773)
    at java.base/java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1136)
    at java.base/java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:635)
    at java.base/java.lang.Thread.run(Thread.java:833)
Caused by: java.io.IOException: too many concurrent streams

所以问题不在于每秒请求数量,而是其他一些东西,似乎是每个 HTTP 连接/客户端的并发打开流数量。
我们可以通过不为所有批量请求共享同一个 HTTP 客户端(和请求)来验证这一点:
import java.net.http.HttpClient;
import java.net.http.HttpRequest;
import java.net.http.HttpResponse;
import java.net.http.HttpResponse.BodyHandlers;
import java.util.ArrayList;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.Executors;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicInteger;
​
public class TooManyConcurrentStreams2 {
​
    public static void main(String[] args) {
        final var counter = new AtomicInteger();
​
        final var scheduler = Executors.newScheduledThreadPool(2);
​
        scheduler.schedule(scheduler::shutdown, 1, TimeUnit.HOURS);
​
        scheduler.scheduleAtFixedRate(() -> {
            final var httpClient = newHttpClient();
            final var request = newRequest();
            final var batchSize = counter.incrementAndGet();
            
            final var responses = new ArrayList<CompletableFuture<HttpResponse<Void>>>(batchSize);
​
            try {
                for (int i = 0; i < batchSize; i++) {
                    responses.add(
                            httpClient.sendAsync(
                                    request,
                                    BodyHandlers.discarding()
                            )
                    );
                }
​
                for (CompletableFuture<HttpResponse<Void>> response : responses) {
                    response.thenAccept(x -> {
                    });
                    response.join();
                }
            } catch (Exception e) {
                e.printStackTrace();
            }
​
            System.out.println("batchSize = " + batchSize);
        }, 0, 500, TimeUnit.MILLISECONDS);
    }
​
​
    public static HttpRequest newRequest() {
        return HttpRequest.newBuilder()
                .uri(Constants.TEST_URI)
                .header("Content-Type", Constants.CONTENT_TYPE)
                .header("Accept", Constants.CONTENT_TYPE)
                .POST(HttpRequest.BodyPublishers.ofString(Constants.BODY))
                .build();
    }
​
    public static HttpClient newHttpClient() {
        return HttpClient.newBuilder()
                .version(HttpClient.Version.HTTP_2)
                .build();
    }
}

对我来说,这个在第143次尝试时失败了,并显示以下错误消息:
java.util.concurrent.CompletionException: java.lang.InternalError: java.net.SocketException: Too many open files
    at java.base/java.util.concurrent.CompletableFuture.encodeThrowable(CompletableFuture.java:315)
    at java.base/java.util.concurrent.CompletableFuture.completeThrowable(CompletableFuture.java:320)
    at java.base/java.util.concurrent.CompletableFuture$UniCompose.tryFire(CompletableFuture.java:1159)
    at java.base/java.util.concurrent.CompletableFuture.postComplete(CompletableFuture.java:510)
    at java.base/java.util.concurrent.CompletableFuture$AsyncSupply.run(CompletableFuture.java:1773)
    at java.base/java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1136)
    at java.base/java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:635)
    at java.base/java.lang.Thread.run(Thread.java:833)
Caused by: java.lang.InternalError: java.net.SocketException: Too many open files
    at java.net.http/jdk.internal.net.http.PlainHttpConnection.<init>(PlainHttpConnection.java:293)
    at java.net.http/jdk.internal.net.http.AsyncSSLConnection.<init>(AsyncSSLConnection.java:49)
    at java.net.http/jdk.internal.net.http.HttpConnection.getSSLConnection(HttpConnection.java:293)
    at java.net.http/jdk.internal.net.http.HttpConnection.getConnection(HttpConnection.java:279)
    at java.net.http/jdk.internal.net.http.Http2Connection.createAsync(Http2Connection.java:369)
    at java.net.http/jdk.internal.net.http.Http2ClientImpl.getConnectionFor(Http2ClientImpl.java:128)
    at java.net.http/jdk.internal.net.http.ExchangeImpl.get(ExchangeImpl.java:93)
    at java.net.http/jdk.internal.net.http.Exchange.establishExchange(Exchange.java:343)
    at java.net.http/jdk.internal.net.http.Exchange.responseAsyncImpl0(Exchange.java:475)
    at java.net.http/jdk.internal.net.http.Exchange.responseAsyncImpl(Exchange.java:380)
    at java.net.http/jdk.internal.net.http.Exchange.responseAsync(Exchange.java:372)
    at java.net.http/jdk.internal.net.http.MultiExchange.responseAsyncImpl(MultiExchange.java:408)
    at java.net.http/jdk.internal.net.http.MultiExchange.lambda$responseAsync0$2(MultiExchange.java:341)
    at java.base/java.util.concurrent.CompletableFuture$UniCompose.tryFire(CompletableFuture.java:1150)
    ... 5 more

这很可能是因为我的笔记本电脑的ulimit相对较低,只有12544。

网页内容由stack overflow 提供, 点击上面的
可以查看英文原文,
原文链接