我看到很多关于Netty中分块流的问题,但是大部分都是关于出站流的解决方案,而不是入站流。
我想了解如何从通道获取数据,并将其作为InputStream发送到我的业务逻辑,而无需首先将所有数据加载到内存中。
以下是我尝试做的事情:
public class ServerRequestHandler extends MessageToMessageDecoder<HttpObject> {
private HttpServletRequest request;
private PipedOutputStream os;
private PipedInputStream is;
@Override
public void handlerAdded(ChannelHandlerContext ctx) throws Exception {
super.handlerAdded(ctx);
this.os = new PipedOutputStream();
this.is = new PipedInputStream(os);
}
@Override
public void handlerRemoved(ChannelHandlerContext ctx) throws Exception {
super.handlerRemoved(ctx);
this.os.close();
this.is.close();
}
@Override
protected void decode(ChannelHandlerContext ctx, HttpObject msg, List<Object> out)
throws Exception {
if (msg instanceof HttpRequest) {
this.request = new CustomHttpRequest((HttpRequest) msg, this.is);
out.add(this.request);
}
if (msg instanceof HttpContent) {
ByteBuf body = ((HttpContent) msg).content();
if (body.readableBytes() > 0)
body.readBytes(os, body.readableBytes());
if (msg instanceof LastHttpContent) {
os.close();
}
}
}
}
然后我还有另一个处理程序,将获取我的CustomHttpRequest并发送到我称为ServiceHandler的处理程序,其中我的业务逻辑将从InputStream中读取。
public class ServiceRouterHandler extends SimpleChannelInboundHandler<CustomHttpRequest> {
...
@Override
public void channelRead0(ChannelHandlerContext ctx, CustomHttpRequest request) throws IOException {
...
future = serviceHandler.handle(request, response);
...
这种方法不起作用,因为当我的处理程序将CustomHttpRequest转发到ServiceHandler时,它试图从InputStream中读取数据时,线程会被阻塞,HttpContent永远无法在我的解码器中处理。
我知道我可以尝试为我的业务逻辑创建一个单独的线程,但我觉得这样会过于复杂。 我看了一下ByteBufInputStream,但它说:
请注意,它只会读取在构造时确定的可读字节数。
所以我认为它对分块的 Http 请求不起作用。此外,我看到有 ChunkedWriteHandler,它似乎适用于输出流块,但我找不到与之相应的 ChunkedReadHandler...
那么我的问题是:最好的方法是什么?我的要求如下: - 在发送到 ServiceHandlers 之前不要在内存中保留数据; - ServiceHandlers 的 API 应该与 netty 无关(这就是为什么我使用自定义 CustomHttpRequest 而不是 Netty 的 HttpRequest)。
更新: 我已经通过更加反应灵敏的方法在 CustomHttpRequest 上实现了这个功能。现在,请求不再提供 InputStream 给 ServiceHandlers 读取(这会导致阻塞),而是 CustomHttpRequest 现在具有 readInto(OutputStream) 方法,该方法返回一个 Future,并且只有在 Outputstream 被填充满后才会执行所有的服务处理程序。以下是其外观:
public class CustomHttpRequest {
...constructors and other methods hidden...
private final SettableFuture<Void> writeCompleteFuture = SettableFuture.create();
private final SettableFuture<OutputStream> outputStreamFuture = SettableFuture.create();
private ListenableFuture<Void> lastWriteFuture = Futures.transform(outputStreamFuture, x-> null);
public ListenableFuture<Void> readInto(OutputStream os) throws IOException {
outputStreamFuture.set(os);
return this.writeCompleteFuture;
}
ListenableFuture<Void> writeChunk(byte[] buf) {
this.lastWriteFuture = Futures.transform(lastWriteFuture, (AsyncFunction<Void, Void>) (os) -> {
outputStreamFuture.get().write(buf);
return Futures.immediateFuture(null);
});
return lastWriteFuture;
}
void complete() {
ListenableFuture<Void> future =
Futures.transform(lastWriteFuture, (AsyncFunction<Void, Void>) x -> {
outputStreamFuture.get().close();
return Futures.immediateFuture(null);
});
addFinallyCallback(future, () -> {
this.writeCompleteFuture.set(null);
});
}
}
我的更新后的ServletRequestHandler看起来像这样:
public class ServerRequestHandler extends MessageToMessageDecoder<HttpObject> {
private NettyHttpServletRequestAdaptor request;
@Override
public void handlerAdded(ChannelHandlerContext ctx) throws Exception {
super.handlerAdded(ctx);
}
@Override
public void handlerRemoved(ChannelHandlerContext ctx) throws Exception {
super.handlerRemoved(ctx);
}
@Override
protected void decode(ChannelHandlerContext ctx, HttpObject msg, List<Object> out)
throws Exception {
if (msg instanceof HttpRequest) {
HttpRequest request = (HttpRequest) msg;
this.request = new CustomHttpRequest(request, ctx.channel());
out.add(this.request);
}
if (msg instanceof HttpContent) {
ByteBuf buf = ((HttpContent) msg).content();
byte[] bytes = new byte[buf.readableBytes()];
buf.readBytes(bytes);
this.request.writeChunk(bytes);
if (msg instanceof LastHttpContent) {
this.request.complete();
}
}
}
}
这个方法效果还不错,但需要注意的是所有操作都在单线程中进行,对于大数据量的情况,我们可能需要创建一个新线程以释放当前线程用于其他渠道。