Netty分块输入流

3

我看到很多关于Netty中分块流的问题,但是大部分都是关于出站流的解决方案,而不是入站流。

我想了解如何从通道获取数据,并将其作为InputStream发送到我的业务逻辑,而无需首先将所有数据加载到内存中。
以下是我尝试做的事情:

public class ServerRequestHandler extends MessageToMessageDecoder<HttpObject> {

  private HttpServletRequest request;
  private PipedOutputStream os;
  private PipedInputStream is;

  @Override
  public void handlerAdded(ChannelHandlerContext ctx) throws Exception {
    super.handlerAdded(ctx);
    this.os = new PipedOutputStream();
    this.is = new PipedInputStream(os);
  }

  @Override
  public void handlerRemoved(ChannelHandlerContext ctx) throws Exception {
    super.handlerRemoved(ctx);
    this.os.close();
    this.is.close();
  }

  @Override
  protected void decode(ChannelHandlerContext ctx, HttpObject msg, List<Object> out)
      throws Exception {
    if (msg instanceof HttpRequest) {
      this.request = new CustomHttpRequest((HttpRequest) msg, this.is);
      out.add(this.request);
    }
    if (msg instanceof HttpContent) {
      ByteBuf body = ((HttpContent) msg).content();

      if (body.readableBytes() > 0)
        body.readBytes(os, body.readableBytes());

      if (msg instanceof LastHttpContent) {
        os.close();
      }
    }

  }

}

然后我还有另一个处理程序,将获取我的CustomHttpRequest并发送到我称为ServiceHandler的处理程序,其中我的业务逻辑将从InputStream中读取。

public class ServiceRouterHandler extends SimpleChannelInboundHandler<CustomHttpRequest> {
...
    @Override
    public void channelRead0(ChannelHandlerContext ctx, CustomHttpRequest request) throws IOException {
...
        future = serviceHandler.handle(request, response);
...

这种方法不起作用,因为当我的处理程序将CustomHttpRequest转发到ServiceHandler时,它试图从InputStream中读取数据时,线程会被阻塞,HttpContent永远无法在我的解码器中处理。
我知道我可以尝试为我的业务逻辑创建一个单独的线程,但我觉得这样会过于复杂。 我看了一下ByteBufInputStream,但它说:
请注意,它只会读取在构造时确定的可读字节数。
所以我认为它对分块的 Http 请求不起作用。此外,我看到有 ChunkedWriteHandler,它似乎适用于输出流块,但我找不到与之相应的 ChunkedReadHandler...
那么我的问题是:最好的方法是什么?我的要求如下: - 在发送到 ServiceHandlers 之前不要在内存中保留数据; - ServiceHandlers 的 API 应该与 netty 无关(这就是为什么我使用自定义 CustomHttpRequest 而不是 Netty 的 HttpRequest)。
更新: 我已经通过更加反应灵敏的方法在 CustomHttpRequest 上实现了这个功能。现在,请求不再提供 InputStream 给 ServiceHandlers 读取(这会导致阻塞),而是 CustomHttpRequest 现在具有 readInto(OutputStream) 方法,该方法返回一个 Future,并且只有在 Outputstream 被填充满后才会执行所有的服务处理程序。以下是其外观:
public class CustomHttpRequest {
  ...constructors and other methods hidden...
  private final SettableFuture<Void> writeCompleteFuture = SettableFuture.create();

  private final SettableFuture<OutputStream> outputStreamFuture = SettableFuture.create();

  private ListenableFuture<Void> lastWriteFuture = Futures.transform(outputStreamFuture, x-> null);

  public ListenableFuture<Void> readInto(OutputStream os) throws IOException {
    outputStreamFuture.set(os);
    return this.writeCompleteFuture;
  }

  ListenableFuture<Void> writeChunk(byte[] buf) {
    this.lastWriteFuture = Futures.transform(lastWriteFuture, (AsyncFunction<Void, Void>) (os) -> {
      outputStreamFuture.get().write(buf);
      return Futures.immediateFuture(null);
    });
    return lastWriteFuture;
  }


  void complete() {
    ListenableFuture<Void> future =
        Futures.transform(lastWriteFuture, (AsyncFunction<Void, Void>) x -> {
          outputStreamFuture.get().close();
          return Futures.immediateFuture(null);
        });
    addFinallyCallback(future, () -> {
      this.writeCompleteFuture.set(null);
    });

  }
}

我的更新后的ServletRequestHandler看起来像这样:

public class ServerRequestHandler extends MessageToMessageDecoder<HttpObject> {

  private NettyHttpServletRequestAdaptor request;

  @Override
  public void handlerAdded(ChannelHandlerContext ctx) throws Exception {
    super.handlerAdded(ctx);
  }

  @Override
  public void handlerRemoved(ChannelHandlerContext ctx) throws Exception {
    super.handlerRemoved(ctx);
  }


  @Override
  protected void decode(ChannelHandlerContext ctx, HttpObject msg, List<Object> out)
      throws Exception {
    if (msg instanceof HttpRequest) {
      HttpRequest request = (HttpRequest) msg;

      this.request = new CustomHttpRequest(request, ctx.channel());

      out.add(this.request);
    }
    if (msg instanceof HttpContent) {
      ByteBuf buf = ((HttpContent) msg).content();
      byte[] bytes = new byte[buf.readableBytes()];
      buf.readBytes(bytes);

      this.request.writeChunk(bytes);

      if (msg instanceof LastHttpContent) {
        this.request.complete();
      }
    }
  }
}

这个方法效果还不错,但需要注意的是所有操作都在单线程中进行,对于大数据量的情况,我们可能需要创建一个新线程以释放当前线程用于其他渠道。


常规的InputStream或OutputStream不能正常工作吗?PipedInput/OutputStream在文档中指出:“不建议在单个线程中使用两个对象,因为它可能会导致线程死锁”。 - aglassman
我建议尝试使用BufferedInput/OutputStreams。 - aglassman
我也担心ServerRequestHandler。除非每个请求都创建一个新的类,否则一旦有多个请求,这将失败。 - aglassman
关于ServerRequestHandler的问题,也许您不熟悉Netty,或者我在这里漏掉了什么。在Netty中,每个处理程序只由一个线程按设计调用,因此我认为这是安全的。 - Murilo Tavares
没关系,我明白你所说的ServerRequestHandler问题了,确实这是一个需要通过在收到LastHttpContent时重置状态来处理的问题。感谢你指出这一点。但是,我认为我的主要担忧仍然是有效的。 - Murilo Tavares
显示剩余3条评论
1个回答

0

你走在正确的轨道上 - 如果你的 serviceHandler.handle(request, response); 调用正在进行阻塞读取,那么你需要为它创建一个新线程。记住,Netty worker 线程应该只有很少的数量,所以你不应该在 worker 线程中进行任何阻塞调用。

另一个要问的问题是,你的服务处理程序是否需要阻塞?它做什么?如果它无论如何都在网络上传输数据,你能否以非阻塞的方式将其合并到 Netty pipeline 中?这样,一切都是异步的,没有阻塞调用和额外的线程。


谢谢您的回复。我刚更新了我的问题,提供了一种解决方案,其中服务处理程序在读取时不会阻塞。但是我很好奇您所说的“为其创建Netty管道”是什么意思。 - Murilo Tavares
抱歉,之前表述不够清晰。我刚刚修改了一下,改成“你能将其整合到Netty管道中吗?”我的意思是,只有当你需要与那些使用阻塞读取方式的库进行接口交互时,才需要使用InputStream进行阻塞读取。如果你完全掌控自己的应用程序,就可以按照Netty的方式使用通道/处理程序。 - spinlok
这是将现有服务器迁移到Netty的一部分。该服务器允许开发人员创建自己的服务处理程序。考虑到这一点,我们更喜欢通过创建作为服务处理程序的调度程序的Netty处理程序来保持服务处理程序API与Netty无关。因此,将每个服务处理程序添加到Netty管道中对我来说并不是很好的选择。 - Murilo Tavares

网页内容由stack overflow 提供, 点击上面的
可以查看英文原文,
原文链接