我正在尝试实现一个“缩略图生成器”作为微服务。我认为这样的东西最好作为TCP服务器,因此在简要调查了几个选项后,我选择了Netty。为了使服务尽可能节省内存,我希望避免将完整的图像加载到内存中,因此一直在尝试构建一个管道,其中“ThumbnailHandler”可以使用管道流利用Netty的分块读取,以便随着Netty接收更多字节,缩略图生成器可以遍历更多的流。不幸的是,我对Netty或NIO模式不太熟悉,不知道是否采取了最佳方法,而且我甚至无法让简化版本按照我的期望工作。
这是我的服务器设置:
public class ThumbnailerServer {
private int port;
public ThumbnailerServer(int port) {
this.port = port;
}
public void run() throws Exception {
final ThreadFactory acceptFactory = new DefaultThreadFactory("accept");
final ThreadFactory connectFactory = new DefaultThreadFactory("connect");
final NioEventLoopGroup acceptGroup = new NioEventLoopGroup(1, acceptFactory, NioUdtProvider.BYTE_PROVIDER);
final NioEventLoopGroup connectGroup = new NioEventLoopGroup(0, connectFactory, NioUdtProvider.BYTE_PROVIDER);
try {
ServerBootstrap b = new ServerBootstrap();
b.group(acceptGroup, connectGroup)
.channelFactory(NioUdtProvider.BYTE_ACCEPTOR)
.option(ChannelOption.SO_BACKLOG, 128)
.handler(new LoggingHandler(LogLevel.INFO))
.childHandler(new ChannelInitializer<UdtChannel>() {
@Override
public void initChannel(UdtChannel ch) throws Exception {
ChannelPipeline p = ch.pipeline();
p.addLast("handler", new ThumbnailerServerHandler());
}
});
// bind and start to accept incoming connections.
b.bind(port).sync().channel().closeFuture().sync();
} finally {
connectGroup.shutdownGracefully();
acceptGroup.shutdownGracefully();
}
}
}
还有缩略图处理程序:
public class ThumbnailerServerHandler extends SimpleChannelInboundHandler<ByteBuf> {
private static final Logger logger = LoggerFactory.getLogger(ThumbnailerServerHandler.class);
private PipedInputStream toThumbnailer = new PipedInputStream();
private PipedOutputStream fromClient = new PipedOutputStream(toThumbnailer);
private static final ListeningExecutorService executor = MoreExecutors.listeningDecorator(
Executors.newFixedThreadPool(5));
private ListenableFuture<OutputStream> future;
public ThumbnailerServerHandler() throws IOException {
super(ByteBuf.class, true);
}
@Override
public void handlerAdded(ChannelHandlerContext ctx) throws Exception {
future = executor.submit(() -> ThumbnailGenerator.generate(toThumbnailer));
future.addListener(() -> {
try {
ctx.writeAndFlush(future.get());
} catch (InterruptedException | ExecutionException e) {
e.printStackTrace();
}
}, executor);
}
@Override
public void handlerRemoved(ChannelHandlerContext ctx) throws Exception {
this.fromClient.close();
this.toThumbnailer.close();
}
@Override
protected void channelRead0(ChannelHandlerContext ctx, ByteBuf msg) throws Exception {
int readableBytes = msg.readableBytes();
msg.readBytes(this.fromClient, readableBytes);
}
@Override
public void exceptionCaught(ChannelHandlerContext ctx, Throwable cause) throws Exception {
logger.error("Encountered error during communication", cause);
ctx.close();
}
}
在我完全掌握整个流程之前,这是我简化的“缩略图生成器”:
public class ThumbnailGenerator {
public static OutputStream generate(InputStream toThumbnailer) {
OutputStream stream = new ByteArrayOutputStream();
try {
IOUtils.copy(toThumbnailer, stream);
} catch (IOException e) {
e.printStackTrace();
}
return stream;
}
}
- 在handlerAdded方法中启动异步任务是否合适?有没有更“netty”的做法?
- IOUtils.copy阻塞式读取(由于管道输入流的读取)直到有可读数据,因此我将其转移到执行程序池中,因为如果我要继续接收字节,则不能在处理程序中阻塞。但是,我发现这个过程永远不会完成,但它确实在进展。那是因为我从未遇到EOF字节(-1)吗?我如何让此流程工作?
- 我是否错过了netty中简化此过程的构造?我考虑将其实现为解码器,直到整个流都可用才进行解码,但这样我将加载所有内容到内存中。