Netty实现的管道流传输

4
我正在尝试实现一个“缩略图生成器”作为微服务。我认为这样的东西最好作为TCP服务器,因此在简要调查了几个选项后,我选择了Netty。为了使服务尽可能节省内存,我希望避免将完整的图像加载到内存中,因此一直在尝试构建一个管道,其中“ThumbnailHandler”可以使用管道流利用Netty的分块读取,以便随着Netty接收更多字节,缩略图生成器可以遍历更多的流。不幸的是,我对Netty或NIO模式不太熟悉,不知道是否采取了最佳方法,而且我甚至无法让简化版本按照我的期望工作。

这是我的服务器设置:

public class ThumbnailerServer {

    private int port;

    public ThumbnailerServer(int port) {
        this.port = port;
    }

    public void run() throws Exception {
        final ThreadFactory acceptFactory = new DefaultThreadFactory("accept");
        final ThreadFactory connectFactory = new DefaultThreadFactory("connect");
        final NioEventLoopGroup acceptGroup = new NioEventLoopGroup(1, acceptFactory, NioUdtProvider.BYTE_PROVIDER);
        final NioEventLoopGroup connectGroup = new NioEventLoopGroup(0, connectFactory, NioUdtProvider.BYTE_PROVIDER);

        try {
            ServerBootstrap b = new ServerBootstrap();

            b.group(acceptGroup, connectGroup)
             .channelFactory(NioUdtProvider.BYTE_ACCEPTOR)
             .option(ChannelOption.SO_BACKLOG, 128)
             .handler(new LoggingHandler(LogLevel.INFO))
             .childHandler(new ChannelInitializer<UdtChannel>() {
                 @Override
                 public void initChannel(UdtChannel ch) throws Exception {
                     ChannelPipeline p = ch.pipeline();
                     p.addLast("handler", new ThumbnailerServerHandler());
                 }
             });

            // bind and start to accept incoming connections.
            b.bind(port).sync().channel().closeFuture().sync();
        } finally {
            connectGroup.shutdownGracefully();
            acceptGroup.shutdownGracefully();
        }
    }

}

还有缩略图处理程序:

public class ThumbnailerServerHandler extends SimpleChannelInboundHandler<ByteBuf> {

    private static final Logger logger = LoggerFactory.getLogger(ThumbnailerServerHandler.class);
    private PipedInputStream toThumbnailer = new PipedInputStream();
    private PipedOutputStream fromClient = new PipedOutputStream(toThumbnailer);

    private static final ListeningExecutorService executor = MoreExecutors.listeningDecorator(
            Executors.newFixedThreadPool(5));

    private ListenableFuture<OutputStream> future;

    public ThumbnailerServerHandler() throws IOException {
        super(ByteBuf.class, true);
    }

    @Override
    public void handlerAdded(ChannelHandlerContext ctx) throws Exception {
        future = executor.submit(() -> ThumbnailGenerator.generate(toThumbnailer));
        future.addListener(() -> {
            try {
                ctx.writeAndFlush(future.get());
            } catch (InterruptedException | ExecutionException e) {
                e.printStackTrace();
            }
        }, executor);
    }

    @Override
    public void handlerRemoved(ChannelHandlerContext ctx) throws Exception {
        this.fromClient.close();
        this.toThumbnailer.close();
    }

    @Override
    protected void channelRead0(ChannelHandlerContext ctx, ByteBuf msg) throws Exception {
        int readableBytes = msg.readableBytes();
        msg.readBytes(this.fromClient, readableBytes);
    }

    @Override
    public void exceptionCaught(ChannelHandlerContext ctx, Throwable cause) throws Exception {
        logger.error("Encountered error during communication", cause);
        ctx.close();
    }


}

在我完全掌握整个流程之前,这是我简化的“缩略图生成器”:

public class ThumbnailGenerator {

    public static OutputStream generate(InputStream toThumbnailer) {
        OutputStream stream = new ByteArrayOutputStream();
        try {
            IOUtils.copy(toThumbnailer, stream);
        } catch (IOException e) {
            e.printStackTrace();
        }
        return stream;
    }

}
  1. 在handlerAdded方法中启动异步任务是否合适?有没有更“netty”的做法?
  2. IOUtils.copy阻塞式读取(由于管道输入流的读取)直到有可读数据,因此我将其转移到执行程序池中,因为如果我要继续接收字节,则不能在处理程序中阻塞。但是,我发现这个过程永远不会完成,但它确实在进展。那是因为我从未遇到EOF字节(-1)吗?我如何让此流程工作?
  3. 我是否错过了netty中简化此过程的构造?我考虑将其实现为解码器,直到整个流都可用才进行解码,但这样我将加载所有内容到内存中。
1个回答

3

好的,事实证明我有一些误解,这些误解解释了为什么我在尝试做某件事时遇到了麻烦。

1)许多文件类型没有所谓的终端字节。实际上,EOF字节(最常见的是-1,因为它是溢出值)通常是读取器提供给其使用者的一种实现,用于表示它们已经到达内容的末尾。它通常不是文件本身存在的东西。

2)channelReadComplete并不像听起来那么简单。在满足netty中配置的最大读取次数(默认为10)或者在读取空缓冲区或接收到小于配置块大小的缓冲区时,channelReadComplete会被调用。

至于为什么似乎输入流复制挂起了,那是因为管道输入流从未生成终端值(这是EOF字节的读取器实现的示例)。只有在驱动它们的输出流关闭后,PipedInputStreams才会指示EOF。

要使此实现工作,我应该将消息计数增加到足够高的数字,并信任channelReadComplete在返回小于块大小的最后一次读取后被调用。此时,关闭和重置输出流以进行下一条消息将是安全的。关闭输出流将导致输入流最终返回EOF字节,然后其他所有内容都可以继续进行。


网页内容由stack overflow 提供, 点击上面的
可以查看英文原文,
原文链接