Java 21内置的HTTP客户端将载体线程固定住。

18
我正在使用Java Corretto 21.0.0.35.1 build 21+35-LTS,以及内置的Java HTTP客户端来获取响应作为InputStream。我正在使用虚拟线程进行并行请求,大部分情况下都运行良好。然而,偶尔在我的测试中会遇到一个"固定"事件,如下面的堆栈跟踪所示。
我相信JDK已经更新以完全支持虚拟线程,在我理解中,HTTP客户端不应该完全固定一个载体线程。然而,当读取并(自动)关闭InputStream时,有时会出现这种固定事件。
这种行为是否符合预期,或者仍然可能是JDK中的一个错误?
HttpResponse<InputStream> response = httpClient.send(request, HttpResponse.BodyHandlers.ofInputStream());
try (InputStream responseBody = response.body()) {
  return parser.parse(responseBody); // LINE 52 in the trace below
}

追踪
* Pinning event captured:
  java.lang.VirtualThread.parkOnCarrierThread(java.lang.VirtualThread.java:687)
  java.lang.VirtualThread.park(java.lang.VirtualThread.java:603)
  java.lang.System$2.parkVirtualThread(java.lang.System$2.java:2639)
  jdk.internal.misc.VirtualThreads.park(jdk.internal.misc.VirtualThreads.java:54)
  java.util.concurrent.locks.LockSupport.park(java.util.concurrent.locks.LockSupport.java:219)
  java.util.concurrent.locks.AbstractQueuedSynchronizer.acquire(java.util.concurrent.locks.AbstractQueuedSynchronizer.java:754)
  java.util.concurrent.locks.AbstractQueuedSynchronizer.acquire(java.util.concurrent.locks.AbstractQueuedSynchronizer.java:990)
  java.util.concurrent.locks.ReentrantLock$Sync.lock(java.util.concurrent.locks.ReentrantLock$Sync.java:153)
  java.util.concurrent.locks.ReentrantLock.lock(java.util.concurrent.locks.ReentrantLock.java:322)
  sun.nio.ch.SocketChannelImpl.implCloseNonBlockingMode(sun.nio.ch.SocketChannelImpl.java:1091)
  sun.nio.ch.SocketChannelImpl.implCloseSelectableChannel(sun.nio.ch.SocketChannelImpl.java:1124)
  java.nio.channels.spi.AbstractSelectableChannel.implCloseChannel(java.nio.channels.spi.AbstractSelectableChannel.java:258)
  java.nio.channels.spi.AbstractInterruptibleChannel.close(java.nio.channels.spi.AbstractInterruptibleChannel.java:113)
  jdk.internal.net.http.PlainHttpConnection.close(jdk.internal.net.http.PlainHttpConnection.java:427)
  jdk.internal.net.http.PlainHttpConnection.close(jdk.internal.net.http.PlainHttpConnection.java:406)
  jdk.internal.net.http.Http1Response.lambda$readBody$1(jdk.internal.net.http.Http1Response.java:355)
  jdk.internal.net.http.Http1Response$$Lambda+0x00007f4cb5e6c438.749276779.accept(jdk.internal.net.http.Http1Response$$Lambda+0x00007f4cb5e6c438.749276779.java:-1)
  jdk.internal.net.http.ResponseContent$ChunkedBodyParser.onError(jdk.internal.net.http.ResponseContent$ChunkedBodyParser.java:185)
  jdk.internal.net.http.Http1Response$BodyReader.onReadError(jdk.internal.net.http.Http1Response$BodyReader.java:677)
  jdk.internal.net.http.Http1AsyncReceiver.checkForErrors(jdk.internal.net.http.Http1AsyncReceiver.java:302)
  jdk.internal.net.http.Http1AsyncReceiver.flush(jdk.internal.net.http.Http1AsyncReceiver.java:268)
  jdk.internal.net.http.Http1AsyncReceiver$$Lambda+0x00007f4cb5e31228.555093431.run(jdk.internal.net.http.Http1AsyncReceiver$$Lambda+0x00007f4cb5e31228.555093431.java:-1)
  jdk.internal.net.http.common.SequentialScheduler$LockingRestartableTask.run(jdk.internal.net.http.common.SequentialScheduler$LockingRestartableTask.java:182)
  jdk.internal.net.http.common.SequentialScheduler$CompleteRestartableTask.run(jdk.internal.net.http.common.SequentialScheduler$CompleteRestartableTask.java:149)
  jdk.internal.net.http.common.SequentialScheduler$SchedulableTask.run(jdk.internal.net.http.common.SequentialScheduler$SchedulableTask.java:207)
  jdk.internal.net.http.HttpClientImpl$DelegatingExecutor.execute(jdk.internal.net.http.HttpClientImpl$DelegatingExecutor.java:177)
  jdk.internal.net.http.common.SequentialScheduler.runOrSchedule(jdk.internal.net.http.common.SequentialScheduler.java:282)
  jdk.internal.net.http.common.SequentialScheduler.runOrSchedule(jdk.internal.net.http.common.SequentialScheduler.java:251)
  jdk.internal.net.http.Http1AsyncReceiver.onReadError(jdk.internal.net.http.Http1AsyncReceiver.java:516)
  jdk.internal.net.http.Http1AsyncReceiver.lambda$handlePendingDelegate$3(jdk.internal.net.http.Http1AsyncReceiver.java:380)
  jdk.internal.net.http.Http1AsyncReceiver$$Lambda+0x00007f4cb5e33ca0.84679411.run(jdk.internal.net.http.Http1AsyncReceiver$$Lambda+0x00007f4cb5e33ca0.84679411.java:-1)
  jdk.internal.net.http.Http1AsyncReceiver$Http1AsyncDelegateSubscription.cancel(jdk.internal.net.http.Http1AsyncReceiver$Http1AsyncDelegateSubscription.java:163)
  jdk.internal.net.http.common.HttpBodySubscriberWrapper$SubscriptionWrapper.cancel(jdk.internal.net.http.common.HttpBodySubscriberWrapper$SubscriptionWrapper.java:92)
  jdk.internal.net.http.ResponseSubscribers$HttpResponseInputStream.close(jdk.internal.net.http.ResponseSubscribers$HttpResponseInputStream.java:653)

  com.acme.service.server.StatusClient.getResponse(com.acme.service.server.StatusClient.java:52)
  com.acme.service.server.StatusClient_ClientProxy.getResponse(com.acme.service.server.StatusClient_ClientProxy.java:-1)
  com.acme.client.Request.execute(com.acme.client.Request.java:96)
  com.acme.service.server.serviceStatusProvider.getStatusHistorys(com.acme.service.server.serviceStatusProvider.java:237)
  com.acme.service.api.RemoteStatusCheck.getStatusHistory(com.acme.service.api.RemoteStatusCheck.java:163)
  com.acme.service.api.RemoteStatusCheck.lambda$doChecks$0(com.acme.service.api.RemoteStatusCheck.java:132)
  com.acme.service.api.RemoteStatusCheck$$Lambda+0x00007f4cb9f0d8d0.979953307.call(com.acme.service.api.RemoteStatusCheck$$Lambda+0x00007f4cb9f0d8d0.979953307.java:-1)
  java.util.concurrent.FutureTask.run(java.util.concurrent.FutureTask.java:317)
  java.lang.VirtualThread.runWith(java.lang.VirtualThread.java:341)
  java.lang.VirtualThread.run(java.lang.VirtualThread.java:311)
  java.lang.VirtualThread$VThreadContinuation$1.run(java.lang.VirtualThread$VThreadContinuation$1.java:192)
  jdk.internal.vm.Continuation.enter0(jdk.internal.vm.Continuation.java:320)
  jdk.internal.vm.Continuation.enter(jdk.internal.vm.Continuation.java:312)
  jdk.internal.vm.Continuation.enterSpecial(jdk.internal.vm.Continuation.java:-1)

可能与这个问题相关:https://stackoverflow.com/questions/77262427/how-virtual-thread-is-parking - undefined
1个回答

22
方法 java.nio.channels.spi.AbstractInterruptibleChannel.close()(Temurin-21+35(构建21+35-LTS)中的108-115行,但可能适用于所有OpenJDK衍生版本)的实现如下:
public final void close() throws IOException {
    synchronized (closeLock) {
        if (closed)
            return;
        closed = true;
        implCloseChannel();
    }
}

在你的堆栈跟踪中,第113行对应于implCloseChannel()的调用,这也对应于堆栈跟踪中的前一行,而且这是在那个同步块的中间。如果虚拟线程在synchronized块中被停放/阻塞,它们将被固定,这就是为什么它被固定的原因。
换句话说,根据代码的现状,固定是预期的和正确的行为,因此不是一个错误。
至于在JDK中消除同步块时使用synchronized的原因是一个疏忽,还是因为有特定的原因仍然使用synchronized,我不知道。鉴于它是一个私有锁对象,我猜应该可以通过用ReentrantLock或类似的东西替换它来摆脱它(即它不是通道的“API”的一部分),但也许现在还有其他实现原因要保留它。
我在nio-dev列表的线程Should AbstractInterruptibleChannel.close() still use a synchronized block?中询问了这个问题。
Alan Bateman在那里回答说:
我们决定不值得去做,因为很少设置SO_LINGER。当关闭时,由于读锁或写锁的争用而导致的临时固定是可以接受的。
与此同时,我们正在努力消除对同步块的限制。我们希望很快在loom存储库中有所进展。

感谢您的回复和在开发者列表中发布!我希望它能很快修复 :) 短时间的固定应该不会成为问题,对吧? - undefined
@Urb 不过,据我了解,它不应该这样 - 在负载下,它可能会启动额外的平台线程来补偿固定的线程(这可能是优点也可能是缺点)。 - undefined
1
@Urb 当然可以,但是固定(pinning)并不是一个错误,你不应该将其视为错误。 - undefined
3
@Urb 只是一个性能问题。而且,就像所有潜在的性能问题一样,只有在遇到性能不佳的情况下才会出现性能问题。因此,如果您的应用程序出现实际的性能问题,请打开固定事件(和其他潜在问题)的日志记录,但不要过早地打开。 - undefined
3
@Holger没错。这个想法是通过一个集成测试来捕捉固定事件。事实上,当使用Apache http客户端时,测试发现一个请求中有数百个固定事件。所以我们用内置的客户端替换了Apache客户端,现在只偶尔会出现一个固定事件。在这种情况下,这可能不是一个问题,但是代码可能会发生变化,我们希望尽早了解固定事件,并决定它是否是一个真正的问题。 - undefined
显示剩余5条评论

网页内容由stack overflow 提供, 点击上面的
可以查看英文原文,
原文链接