Java异步HTTP客户端:从LazyResponseBodyPart写入AsynchronousFileChannel时文件损坏

7

我使用 AsyncHttpClient库 来进行异步非阻塞请求。

我的情况是:接收到数据后,将数据写入文件。

为了从远程主机下载文件并保存到文件中,我使用默认的 ResponseBodyPartFactory.EAGERAsynchronousFileChannel,以便在数据到达时不阻塞netty线程。但是根据我的测试结果,与 LAZY 相比,在Java堆上的内存消耗增加了多倍。

因此,我决定直接使用 LAZY,但没有考虑到文件的后果。

此代码将帮助重现该问题:

public static class AsyncChannelWriter {
     private final CompletableFuture<Integer> startPosition;
     private final AsynchronousFileChannel channel;

     public AsyncChannelWriter(AsynchronousFileChannel channel) throws IOException {
         this.channel = channel;
         this.startPosition = CompletableFuture.completedFuture((int) channel.size());
     }

     public CompletableFuture<Integer> getStartPosition() {
         return startPosition;
     }

     public CompletableFuture<Integer> write(ByteBuffer byteBuffer, CompletableFuture<Integer> currentPosition) {

         return currentPosition.thenCompose(position -> {
             CompletableFuture<Integer> writenBytes = new CompletableFuture<>();
             channel.write(byteBuffer, position, null, new CompletionHandler<Integer, ByteBuffer>() {
                 @Override
                 public void completed(Integer result, ByteBuffer attachment) {
                     writenBytes.complete(result);
                 }

                 @Override
                 public void failed(Throwable exc, ByteBuffer attachment) {
                     writenBytes.completeExceptionally(exc);
                 }
             });
             return writenBytes.thenApply(writenBytesLength -> writenBytesLength + position);
         });
     }

     public void close(CompletableFuture<Integer> currentPosition) {
         currentPosition.whenComplete((position, throwable) -> IOUtils.closeQuietly(channel));
     }
 }

 public static void main(String[] args) throws IOException {
     final String filepath = "/media/veracrypt4/files/1.jpg";
     final String downloadUrl = "https://m0.cl/t/butterfly-3000.jpg";

     final AsyncHttpClient client = Dsl.asyncHttpClient(Dsl.config().setFollowRedirect(true)
             .setResponseBodyPartFactory(AsyncHttpClientConfig.ResponseBodyPartFactory.LAZY));
     final AsynchronousFileChannel channel = AsynchronousFileChannel.open(Paths.get(filepath), StandardOpenOption.WRITE, StandardOpenOption.TRUNCATE_EXISTING, StandardOpenOption.CREATE);
     final AsyncChannelWriter asyncChannelWriter = new AsyncChannelWriter(channel);
     final AtomicReference<CompletableFuture<Integer>> atomicReferencePosition = new AtomicReference<>(asyncChannelWriter.getStartPosition());
     client.prepareGet(downloadUrl)
             .execute(new AsyncCompletionHandler<Response>() {

                 @Override
                 public State onBodyPartReceived(HttpResponseBodyPart content) throws Exception {
//if EAGER, content.getBodyByteBuffer() return HeapByteBuffer, if LAZY, return DirectByteBuffer
                     final ByteBuffer bodyByteBuffer = content.getBodyByteBuffer();
                     final CompletableFuture<Integer> currentPosition = atomicReferencePosition.get();
                     final CompletableFuture<Integer> newPosition = asyncChannelWriter.write(bodyByteBuffer, currentPosition);
                     atomicReferencePosition.set(newPosition);
                     return State.CONTINUE;
                 }

                 @Override
                 public Response onCompleted(Response response) {
                     asyncChannelWriter.close(atomicReferencePosition.get());
                     return response;
                 }
             });
}

在这种情况下,图片被损坏了。但如果我使用FileChannel而不是AsynchronousFileChannel,那么在两种情况下,文件都会正常生成。在使用DirectByteBuffer(例如LazyResponseBodyPart.getBodyByteBuffer())和AsynchronousFileChannel时,是否存在细微差别?
如果一切正常,为什么我的代码会有问题,如果我使用?
更新
我注意到,如果我使用LAZY,并在方法onBodyPartReceived中添加行Thread.sleep(10),就像这样:
 @Override
public State onBodyPartReceived(HttpResponseBodyPart content) throws Exception {
    final ByteBuffer bodyByteBuffer = content.getBodyByteBuffer();
    final CompletableFuture<Integer> currentPosition = atomicReferencePosition.get();
    final CompletableFuture<Integer> newPosition = finalAsyncChannelWriter.write(bodyByteBuffer, currentPosition);
    atomicReferencePosition.set(newPosition);
    Thread.sleep(10);
    return State.CONTINUE;
}

文件以非损坏状态保存到磁盘中。

据我了解,原因是在这10毫秒内,AsynchronousFileChannel 中的异步线程成功地从 DirectByteBuffer 将数据写入磁盘。

结果文件会因为这个异步线程与netty线程一起使用这个缓冲区进行写入而变得损坏。

如果我们查看带有 EagerResponseBodyPart 的源代码,那么我们将看到以下内容:

private final byte[] bytes;
  public EagerResponseBodyPart(ByteBuf buf, boolean last) {
    super(last);
    bytes = byteBuf2Bytes(buf);
  }

  @Override
  public ByteBuffer getBodyByteBuffer() {
    return ByteBuffer.wrap(bytes);
  }

因此,当一段数据到达时,它会立即存储在字节数组中。然后,我们可以将它们安全地封装在HeapByteBuffer中,并传输到文件通道的异步线程中。
但是,如果您查看代码LazyResponseBodyPart
  private final ByteBuf buf;

  public LazyResponseBodyPart(ByteBuf buf, boolean last) {
    super(last);
    this.buf = buf;
  }
  @Override
  public ByteBuffer getBodyByteBuffer() {
    return buf.nioBuffer();
  }

正如您所看到的,我们实际上在异步文件通道线程中使用了一个名为Netty ByteBuff(在这种情况下始终是PooledSlicedByteBuf),通过方法调用nioBuffer来实现。

在这种情况下,我该怎么做,如何安全地传递DirectByteBuffer到异步线程中而不将缓冲区复制到Java堆中?


为什么不使用 BodyDeferringAsyncHandler 来简化生活呢? - Mạnh Quyết Nguyễn
@MạnhQuyếtNguyễn 因为它不够有效吗?我使用这个客户端来减少内存消耗和CPU资源。为了简单起见,我可以使用一个Apache同步客户端。 顺便说一下,BodyDeferringAsyncHandler在内存消耗方面与我使用EAGER的示例没有区别,因为BodyDeferringAsyncHandler使用getBodyPartBytes方法。我不确定,但可能在使用BodyDeferringAsyncHandler时,线程会在写入OutputStream时阻塞。 - Peter Kozlovsky
@MạnhQuyếtNguyễn 当然可以,但是处理数据的线程将会被阻塞。 - Peter Kozlovsky
总会有一个线程被阻塞:那个实际写入数据的线程。 - Mạnh Quyết Nguyễn
@MạnhQuyếtNguyễn 如果我们谈论光盘上的写入,那么是的,在我的代码中使用AsynchronousFileChannel时,这是一个阻塞操作,因为Java在Linux中没有实现真正的文件AIO。当数据通过网络到达时,AsyncHttpClient线程不会被阻塞,当数据到达onBodyPartRecieved方法时,我们不应该阻塞netty线程。 然而,我们已经偏离了最初的问题。 - Peter Kozlovsky
显示剩余2条评论
1个回答

1
我和AsyncHttpClient的维护者进行了交流。可以在这里看到
主要问题是我没有使用netty ByteBuf方法retainrelease。 最终,我想出了两个解决方案。 第一个:使用CompletableFuture按顺序将字节写入文件,并跟踪位置。 定义AsynchronousFileChannel的包装类。
@Log4j2
public class AsyncChannelNettyByteBufWriter implements Closeable {
    private final AtomicReference<CompletableFuture<Long>> positionReference;
    private final AsynchronousFileChannel channel;

    public AsyncChannelNettyByteBufWriter(AsynchronousFileChannel channel) {
        this.channel = channel;
        try {
            this.positionReference = new AtomicReference<>(CompletableFuture.completedFuture(channel.size()));
        } catch (IOException e) {
            throw new UncheckedIOException(e);
        }
    }

    public CompletableFuture<Long> write(ByteBuf byteBuffer) {
        final ByteBuf byteBuf = byteBuffer.retain();
        return positionReference.updateAndGet(x -> x.thenCompose(position -> {
            final CompletableFuture<Integer> writenBytes = new CompletableFuture<>();
            channel.write(byteBuf.nioBuffer(), position, byteBuf, new CompletionHandler<Integer, ByteBuf>() {
                @Override
                public void completed(Integer result, ByteBuf attachment) {
                    attachment.release();
                    writenBytes.complete(result);
                }

                @Override
                public void failed(Throwable exc, ByteBuf attachment) {
                    attachment.release();
                    log.error(exc);
                    writenBytes.completeExceptionally(exc);
                }
            });
            return writenBytes.thenApply(writenBytesLength -> writenBytesLength + position);
        }));
    }

    public void close() {
        positionReference.updateAndGet(x -> x.whenComplete((position, throwable) -> {
            try {
                channel.close();
            } catch (IOException e) {
                throw new UncheckedIOException(e);
            }
        }));
    }
}

事实上,如果记录发生在一个线程中,则这里可能不会有AtomicReference,如果来自多个线程,则我们需要严格考虑同步。
并且主要用途。
public static void main(String[] args) throws IOException {
    final String filepath = "1.jpg";
    final String downloadUrl = "https://m0.cl/t/butterfly-3000.jpg";
    final AsyncHttpClient client = Dsl.asyncHttpClient(Dsl.config().setFollowRedirect(true)
            .setResponseBodyPartFactory(AsyncHttpClientConfig.ResponseBodyPartFactory.LAZY));
    final AsynchronousFileChannel channel = AsynchronousFileChannel.open(Paths.get(filepath), StandardOpenOption.WRITE, StandardOpenOption.TRUNCATE_EXISTING, StandardOpenOption.CREATE);
    final AsyncChannelNettyByteBufWriter asyncChannelNettyByteBufWriter = new AsyncChannelNettyByteBufWriter(channel);

    client.prepareGet(downloadUrl)
            .execute(new AsyncCompletionHandler<Response>() {
                @Override
                public State onBodyPartReceived(HttpResponseBodyPart content) {
                    final ByteBuf byteBuf = ((LazyResponseBodyPart) content).getBuf();
                    asyncChannelNettyByteBufWriter.write(byteBuf);
                    return State.CONTINUE;
                }

                @Override
                public Response onCompleted(Response response) {
                    asyncChannelNettyByteBufWriter.close();
                    return response;
                }
            });
}

第二种解决方案:基于接收到的字节数跟踪位置。
public static void main(String[] args) throws IOException {
    final String filepath = "1.jpg";
    final String downloadUrl = "https://m0.cl/t/butterfly-3000.jpg";
    final AsyncHttpClient client = Dsl.asyncHttpClient(Dsl.config().setFollowRedirect(true)
            .setResponseBodyPartFactory(AsyncHttpClientConfig.ResponseBodyPartFactory.LAZY));
    final ExecutorService executorService = Executors.newFixedThreadPool(Runtime.getRuntime().availableProcessors() * 2);
    final AsynchronousFileChannel channel = AsynchronousFileChannel.open(Paths.get(filepath), new HashSet<>(Arrays.asList(StandardOpenOption.WRITE, StandardOpenOption.TRUNCATE_EXISTING, StandardOpenOption.CREATE)), executorService);

    client.prepareGet(downloadUrl)
            .execute(new AsyncCompletionHandler<Response>() {

                private long position = 0;
                @Override
                public State onBodyPartReceived(HttpResponseBodyPart content) {
                    final ByteBuf byteBuf = ((LazyResponseBodyPart) content).getBuf().retain();
                    long currentPosition = position;
                    position+=byteBuf.readableBytes();
                    channel.write(byteBuf.nioBuffer(), currentPosition, byteBuf, new CompletionHandler<Integer, ByteBuf>() {
                        @Override
                        public void completed(Integer result, ByteBuf attachment) {
                            attachment.release();
                            if(content.isLast()){
                                try {
                                    channel.close();
                                } catch (IOException e) {
                                    throw new UncheckedIOException(e);
                                }
                            }
                        }

                        @Override
                        public void failed(Throwable exc, ByteBuf attachment) {
                            attachment.release();
                            try {
                                channel.close();
                            } catch (IOException e) {
                                throw new UncheckedIOException(e);
                            }
                        }
                    });
                    return State.CONTINUE;
                }
                @Override
                public Response onCompleted(Response response) {
                    return response;
                }
            });
}

在第二种解决方案中,由于我们不等待一些字节被写入文件,AsynchronousFileChannel 可以创建很多线程(如果你使用 Linux,因为 Linux 没有实现非阻塞异步文件 IO。在 Windows 中,情况要好得多)。
正如我的测量结果所示,在向缓慢的 USB 闪存写入时,线程数可以达到数万个,因此您需要通过创建自己的 ExecutorService 并将其传递给 AsynchronousFileChannel 来限制线程数。
第一种和第二种解决方案有明显的优缺点吗?我很难说。也许有人可以告诉什么更有效。

你提出的第一个解决方案也不会遇到相同的线程问题吗?因为它也使用了 AsynchronousFileChannel - shays10
@shays10 将会减少,但明显较少。 因为在第一个解决方案中,字节是按顺序写入文件的,而新的一部分字节不会被写入,直到前面的字节被写入。 在第二种情况下,我们不等待前面的字节部分被写入文件,因此在记录速度较慢的情况下,我们会创建大量线程。 您可以通过在慢存储介质(例如廉价闪存驱动器)上模拟录音并检查创建了多少个线程来进行检查。 - Peter Kozlovsky

网页内容由stack overflow 提供, 点击上面的
可以查看英文原文,
原文链接