使用Netty.io将文件从服务器发送到客户端

3
我正在尝试将客户端请求的文件从服务器发送到客户端。客户端在FileRequestProtocol中指定文件,将其发送到服务器,服务器将文件大小添加到FileRequestProtocol并将其返回给客户端。
客户端使用正确的文件大小向其管道添加新的FileChunkReqWriteHandler
服务器使用上下文和所需文件创建新的ChunkedFileServerHandler并尝试发送它,但是FileChunkReqWriteHandler从未从通道中读取字节。
我在这里做错了什么?
日志:
INFO  ProtocolHeadHandler:48 - Client send ProtocolHead [version=1, jobType=FILEREQUEST]
INFO  ProtocolHeadServerHandler:36 - Server receive ProtocolHead [version=1, jobType=FILEREQUEST]
INFO  ProtocolHeadHandler:57 - Client ProtocolHead equals, Send Protocol FileRequestProtocol [filePath=test.jpg, fileSize=0]
INFO  FileRequestServerHandler:42 - Server new FileRequest FileRequestProtocol [filePath=test.jpg, fileSize=0]
INFO  FileRequestHandler:41 - Client receives FileRequestProtocol [filePath=test.jpg, fileSize=174878]
INFO  ChunkedFileServerHandler:39 - New ChunkedFileServerHandler
INFO  FileChunkReqWriteHandler:20 - New ChunkedFile Handler FileRequestProtocol [filePath=test.jpg, fileSize=174878]

Client

FileRequestHandler.java

public class FileRequestHandler extends
    SimpleChannelInboundHandler<FileRequestProtocol> {

private Logger logger = Logger.getLogger(this.getClass());

public FileRequestHandler() {
}

@Override
public void channelRead0(ChannelHandlerContext ctx, FileRequestProtocol msg) {
    logger.info("Client receives " + msg);
    ReferenceCountUtil.release(msg);
    ctx.channel().pipeline().addLast(new FileChunkReqWriteHandler(msg));
}

@Override
public void channelReadComplete(ChannelHandlerContext ctx) {
    logger.info("Client read complete");
    ctx.flush();
}

@Override
public void exceptionCaught(ChannelHandlerContext ctx, Throwable cause) {
    cause.printStackTrace();
    ctx.close();
}
}

FileChunkReqWriteHandler.java

public class FileChunkReqWriteHandler extends SimpleChannelInboundHandler<ChunkedFile> {

FileRequestProtocol fileRequestProtocol;
private Logger logger = Logger.getLogger(this.getClass());


public FileChunkReqWriteHandler(FileRequestProtocol msg) {
    this.fileRequestProtocol = msg;
    logger.info("New ChunkedFile Handler " + msg);
}

@Override
public void channelActive(ChannelHandlerContext ctx) {
    logger.info("in channel active method");
}

@Override
public void exceptionCaught(ChannelHandlerContext ctx, Throwable cause) {
    cause.printStackTrace();

    if (ctx.channel().isActive()) {
        ctx.writeAndFlush("ERR: " +
                cause.getClass().getSimpleName() + ": " +
                cause.getMessage() + '\n').addListener(ChannelFutureListener.CLOSE);
    }
}

@Override
protected void channelRead0(ChannelHandlerContext ctx, ChunkedFile msg)
        throws Exception {
    logger.info("in channelRead0");

}

@Override
public void channelRead(ChannelHandlerContext ctx, Object msg) throws Exception {
    logger.info("channelRead");
    ByteBuf buf = (ByteBuf) msg;
    byte[] bytes = new byte[buf.readableBytes()];
    buf.readBytes(bytes);
    if(buf.readableBytes() >= this.fileRequestProtocol.getFileSize())
    {
        logger.info("received all data");
    }
}
}

Server

FileRequestServerHandler.java

public class FileRequestServerHandler extends
    SimpleChannelInboundHandler<FileRequestProtocol> {

private File f;
private Logger logger = Logger.getLogger(this.getClass());

@Override
public void channelRead0(ChannelHandlerContext ctx, FileRequestProtocol fileRequest) {
    logger.info("Server new FileRequest " + fileRequest);
    f = new File(fileRequest.getFilePath());
    fileRequest.setFileSize(f.length());
    ctx.writeAndFlush(fileRequest);

    new ChunkedFileServerHandler(ctx,f);
}

@Override
public void channelReadComplete(ChannelHandlerContext ctx) {
    logger.info("Server read complete");

}

@Override
public void exceptionCaught(ChannelHandlerContext ctx, Throwable cause) {
    cause.printStackTrace();
    ctx.close();
}
}

ChunkedFileServerHandler.java

public class ChunkedFileServerHandler extends ChunkedWriteHandler {

private Logger logger = Logger.getLogger(this.getClass());

private File file;
public ChunkedFileServerHandler(ChannelHandlerContext ctx, File file) {
    this.file = file;

    logger.info("New ChunkedFileServerHandler");
    ChunkedFile chunkedFile;
    try {
        chunkedFile = new ChunkedFile(this.file);
        ctx.writeAndFlush(chunkedFile);
        ctx.close();
    } catch (IOException e) {
        e.printStackTrace();
    }
}


@Override
public void channelActive(ChannelHandlerContext ctx) throws Exception {
    super.channelActive(ctx);
    logger.info("FILE WRITE GETS ACTIVE");
}

@Override
public void exceptionCaught(ChannelHandlerContext ctx, Throwable cause) {
    cause.printStackTrace();
    ctx.close();
}
}

更新

public class ServerInitializer extends ChannelInitializer<SocketChannel> {

@Override
protected void initChannel(SocketChannel ch) throws Exception {
    ChannelPipeline p = ch.pipeline();

    p.addLast("encoder", new ObjectEncoder());
    p.addLast("decoder",
            new ObjectDecoder(ClassResolvers.cacheDisabled(null)));
    p.addLast("protocolhead", new ProtocolHeadServerHandler());
    p.addLast("filerequestserverhandler", new FileRequestServerHandler());
    p.addLast("chunkedfileserver", new ChunkedFileServerHandler());

}

}

服务器启动

public void startUp()
{
        bossGroup = new NioEventLoopGroup(1);
        workerGroup = new NioEventLoopGroup();
        try {
            ServerBootstrap b = new ServerBootstrap();
            b.group(bossGroup, workerGroup)
             .channel(NioServerSocketChannel.class)
             .handler(new LoggingHandler(LogLevel.INFO))
             .childHandler(new ServerInitializer());

            // Bind and start to accept incoming connections.
            b.bind(this.port).sync().channel().closeFuture().sync();
        } catch (InterruptedException e) {
            e.printStackTrace();
        } finally {
            bossGroup.shutdownGracefully();
            workerGroup.shutdownGracefully();
        }
}

你能展示一下管道配置吗?比如说,你的服务器端管道中是否有ChunkWriteHandler? - Frederic Brégier
我对问题进行了更新。“ChunkedFileServerHandler”正在处理中。 - jussi
1个回答

2

我看到有2个部分:

1) 你不应该在自己的处理程序中创建新的处理程序,而是直接创建ChunkedFile并编写它:

public class FileRequestServerHandler extends
    SimpleChannelInboundHandler<FileRequestProtocol> {

    private File f;
    private Logger logger = Logger.getLogger(this.getClass());

    @Override
    public void channelRead0(ChannelHandlerContext ctx, FileRequestProtocol fileRequest) {
        logger.info("Server new FileRequest " + fileRequest);
        f = new File(fileRequest.getFilePath());
        fileRequest.setFileSize(f.length());
        ctx.writeAndFlush(fileRequest);

        // directly make your chunkedFile there instead of creating a sub handler
        chunkedFile = new ChunkedFile(this.file);
        ctx.writeAndFlush(chunkedFile);// need a specific handler
        // Don't create such an handler: new ChunkedFileServerHandler(ctx,f);
}

2) 由于您使用ChunkedInput(这里是ChunkedFile)进行编写,因此在处理程序之前必须在管道中放置一个ChunkedWriteHandler,因此您的Initializer可能如下所示:

public class ServerInitializer extends ChannelInitializer<SocketChannel> {
    @Override
    protected void initChannel(SocketChannel ch) throws Exception {
        ChannelPipeline p = ch.pipeline();

        p.addLast("encoder", new ObjectEncoder());
        p.addLast("decoder",
            new ObjectDecoder(ClassResolvers.cacheDisabled(null)));
        p.addLast("chunkedWriteHandler", new ChunkedWriteHandler());// added
        p.addLast("protocolhead", new ProtocolHeadServerHandler());
        p.addLast("filerequestserverhandler", new FileRequestServerHandler());
        // removed: p.addLast("chunkedfileserver", new ChunkedFileServerHandler());
    }
}
< p> ChunkedWriteHandler 的位置可以改变,但始终要在您编写 ChunkedFile 的自定义处理程序之前。

3)最后请注意并检查您的编码器/解码器(ObjectEncoder/ObjectDecoder),因为我不能百分之百确定它们是否能够与字节缓冲区从/向文件的写入/读取进行协作。 可能会起作用,也可能不会……


谢谢,看来服务器正在发送文件,但客户端无法正确解码它,就像您在第3条中提到的那样。我得到了'io.netty.handler.codec.TooLongFrameException: Adjusted frame length exceeds 1048576: 4292411364 - discarded'。您有一些关键词可以开始吗? - jussi
我可以提出的建议是将两种流量都管理到自己的编解码器中。您必须像HTTP那样管理类似头部加文件传输的内容,这样您就不需要使用ObjectDecoder/Coder,而是使用自己的编解码器。例如,可以参考LengthFieldBasedFrameDecoder:首字节表示头部,然后是生成FileRequestProtocol对象所需的消息长度,接着是由lengthFile字节组成的内容,通过基于ByteBuf的FileChunkReqWriteHandler捕获(而非ChunkedFile)(利用SimpleChannelInboundHandler仅捕获应该捕获的内容的能力)。 - Frederic Brégier

网页内容由stack overflow 提供, 点击上面的
可以查看英文原文,
原文链接