Netty ByteToMessageCodec<ByteBuf> 对消息进行了部分重复解码

3

我正在使用EmbeddedChannel测试我的handlerscodecs来处理以下格式的消息:

 +------------+------------------+----------------+      
 |   Header   |  Payload Length  |    Payload     |
 |  16 bytes  |     2 bytes      |  "Some data"   |     
 +------------+------------------+----------------+

首先,我想实现以下目标:

  1. 创建一个对象来存储头部详细信息,并将解码后的头部对象添加到ChannelHandlerContextAttributeMap中供以后使用;
  2. 等待/检索整个有效载荷数据;
  3. 在最终处理程序上提供Header对象和整个有效负载作为ByteBuf以路由消息。

我使用以下处理程序:

  1. ByteToMessageCodec<ByteBuf> 来提取头部信息并将其添加到属性列表中。
  2. LengthFieldBasedFrameDecoder 读取有效载荷长度并等待/检索整个帧。
  3. SimpleChannelInboundHandler 将使用从属性列表中检索的头部对象相应地路由有效负载。

当将消息传递给ByteToMessageCodecdecode方法时,头部会被正确地处理和提取。然后我继续将Header对象添加到AttributeMap中,并添加ByteBuf(其中readableBytes = 2个字节(有效载荷长度指示器)+有效载荷长度)。

假设有效载荷长度为1020个字节。最初由codec接收的消息将具有readableBytes = 16字节+2字节+1020字节。头部由decode方法读取,然后剩余的可用字节数(1022)将添加到List<Object> out中。

如果我理解正确,剩余的字节现在将被传递给下一个处理程序,即LengthFieldBasedFrameDecoder,它将读取长度指示器并将有效负载(1020个字节)传递给SimpleChannelHanlder,但我一定是错了。

decode方法再次被调用,使用之前添加到List<Object> out中的相同1022个字节。

decode方法的JavaDoc中,有以下内容:

Decode the from one ByteBuf to an other. This method will be called till either the input ByteBuf
has nothing to read when return from this method or till nothing was read from the input ByteBuf.

这是否意味着将一直调用decode,直到readableBytes == 0
传递其余消息给LengthFieldBasedFrameDecoder的最有效方式是什么?
我假设LengthFieldBasedFrameDecoder需要一个ByteBuf作为输入,这是否意味着我需要将readerIndex = 0并向List<Object> out添加一个ByteBuf的副本?
任何帮助/建议/批评都将不胜感激,我想以尽可能简洁的方式完成此操作。
这是我的decode方法:
protected void decode(ChannelHandlerContext ctx, ByteBuf in, List<Object> out) throws Exception {
    byte [] headerBytes = new byte[HEADER_LENGTH];
    in.readBytes(headerBytes, 0, HEADER_LENGTH);

    Header header = new Header(headerBytes);
    System.out.println("Decoded Header: \n" + header);

    //Set the header attribute so it can be used by routing handlers
    ctx.attr(ChannelAttributes.HEADER).getAndSet(header);
    //pass to next handler
    out.add(in);
}

注意:我正在阅读《Netty in Action MEAP v8》

2个回答

3

这是否意味着decode将被调用直到readableBytes == 0?

基本上是的。一个简化的ByteToMessageDecoder看起来像这样:

while (in.isReadable()) {
    int outputSizeBefore = out.size();
    int readableBytesBefore = in.readableBytes();

    callYourDecodeImpl(ctx, in, out);

    int outputSizeAfter = out.size();
    int readableBytesAfter = in.readableBytes();

    boolean didNotDecodeAnything = outputSizeBefore == outputSizeAfter;
    boolean didNotReadAnything = readableBytesBefore == readableBytesAfter;

    if(didNotDecodeAnything && didNotReadAnything) {
        break;
    }

    // next iteration, continue with decoding
}

因此,您的解码器将不断读取头文件,直到输入缓冲区用尽。

要获得所需的行为,您需要将isSingleDecode标志设置为true:

class MyDecoder extends ByteToMessageDecoder {

    MyDecoder() {
        setSingleDecode(true);
    }

    // your decode impl as before
}

或者

MyDecoder decoder = new MyDecoder();
decoder.setSingleDecode(true);

在你的解码实现解码某些内容后,此方法将停止循环。现在,LengthFieldBasedFrameDecoder将使用你添加到out列表中的ByteBuf进行调用。帧解码按照你所描述的方式工作,无需向列表中添加副本。当负载帧作为msg传递时,SimpleChannelInboundHandler将被调用。

但是,由于每个通道处理程序的ChannelHandlerContext都不同,属性未共享,因此您将无法从AttributeMap中读取标头。

解决此问题的一种方法是使用事件。在你的decoder中,发送Header事件而不是将其添加到AttributeMap中:

// instead of
// ctx.attr(Header.ATTRIBUTE_KEY).getAndSet(header);
// do this
ctx.fireUserEventTriggered(ChannelAttributes.HEADER);

然后,您可以像这样编写您的SimpleChannelInboundHandler

class MyMessageHandler extends SimpleChannelInboundHandler<ByteBuf> {

    private Header header = null;

    MyMessageHandler() {
          super(true);
    }

    @Override
    public void userEventTriggered(final ChannelHandlerContext ctx, final Object evt) throws Exception {
        if (evt instanceof Header) {
            header = (Header) evt;
        } else {
            super.userEventTriggered(ctx, evt);
        }
    }

    @Override
    protected void channelRead0(final ChannelHandlerContext ctx, final ByteBuf msg) throws Exception {
        if (header != null) {
            System.out.println("header = " + header);
            // continue with header, such as routing...
        }
        header = null;
    }
}

另一种选择是将两个对象都发送到管道中,并使用 ChannelInboundHandlerAdapter 而不是 SimpleChannelInboundHandler。在你的 decoder 中,不要将 Header 添加到 AttributeMap 中,而是添加到 out 中:
// ...
out.add(header);
out.add(in);

接下来,按照以下方式编写您的 ChannelInboundHandler

class MyMessageHandler extends ChannelInboundHandlerAdapter {
    private Header header = null;

    @Override
    public void channelRead(final ChannelHandlerContext ctx, final Object msg) throws Exception {
        if (msg instanceof Header) {
            header = (Header) msg;
            System.out.println("got the header " + header);
        } else if (msg instanceof ByteBuf) {
            ByteBuf byteBuf = (ByteBuf) msg;
            System.out.println("got the message " + msg);
            try {
                // continue with header, such as routing...
            } finally {
                ReferenceCountUtil.release(msg);
            }
        } else {
            super.channelRead(ctx, msg);
        }
    }
}
LengthFieldBasedFrameDecoder仅忽略不是ByteBuf的消息,因此如果您的Header没有实现ByteBuf,则会顺利通过,并到达您的ChannelInboundHandler。然后,消息将被解码为有效负载帧,并传递给您的ChannelInboundHandler

1
谢谢! setSingleDecode(true) 正是我所需要的。但是当我在实现一个 ByteToMessageCodec 时,我似乎找不到这个方法,只有一个 ByteToMessageDecoder。没关系,我会将编码和解码过程分开处理。感谢提供关于将 header 传递到下一个处理程序的提示。然而,通过将其添加到通道的 AttributeMap 中并进行 ctx.channel().attr(ChannelAttributes.HEADER).set(header) 操作,我成功地设置并检索了 header。这样做有什么不利之处吗? - Ian2thedv
1
啊,是的,setSingleDecode 只存在于 ByteToMessageDecoder 上。至于通道上的属性映射,我看不出任何缺点,这肯定是可行的解决方案。 - knutwalker

0
作为 knutwalker答案的后续:我找到了一种替代方法,适用于那些无法实现setSingleDecode方法的ByteToMessageCodec用户。

通过in.readRetainedSlice()读取字节,如下所示。

protected void decode(ChannelHandlerContext ctx, ByteBuf in, List<Object> out) throws Exception {
    byte [] headerBytes = new byte[HEADER_LENGTH];
    in.readBytes(headerBytes, 0, HEADER_LENGTH);

    Header header = new Header(headerBytes);
    System.out.println("Decoded Header: \n" + header);

    //Set the header attribute so it can be used by routing handlers
    ctx.attr(ChannelAttributes.HEADER).getAndSet(header);
    //pass to next handler
    int length = in.readShort();
    out.add(in.readRetainedSlice(length));
}

Ian2thedv关注字节复制的效率,但当readableBytes大于您的消息长度时,您不能只是out.add(in),这是不可避免的。

网页内容由stack overflow 提供, 点击上面的
可以查看英文原文,
原文链接