使用Netty的UDP服务器丢失大量UDP请求

6
我使用 Netty 写了一个简单的 UDP 服务器,它会将接收到的消息(帧)打印在日志中。为此,我创建了一个简单的帧解码器和一个简单的消息处理程序。我还编写了一个客户端,可以按顺序和/或并行发送多个请求。
当我配置我的客户端测试器来连续发送几百个请求,并且它们之间有小延迟时,我的使用 Netty 编写的服务器可以正确地接收它们。但是,当我增加客户端中同时发送的请求数量(例如100),并与一些重复和顺序请求配合使用时,我的服务器开始丢失许多请求。例如,当我发送50000个请求时,仅使用简单的 ChannelHandler 打印出接收到的消息时,我的服务器仅能接收大约49000个请求。
当我在这个处理程序前添加简单的帧解码器(用于打印出帧并将其复制到另一个缓冲区)时,服务器仅处理了一半的请求!
我注意到,无论我指定了多少个工作线程给创建的 NioDatagramChannelFactory,始终只有一个线程处理请求(我使用了推荐的 Executors.newCachedThreadPool() 作为其他参数)。
我还创建了另一个类似的简单 UDP 服务器,基于 JDK 提供的 DatagramSocket,它完美地处理了每个请求,没有任何丢失!当我在客户端发送50000个请求(例如使用1000个线程)时,我的服务器收到了50000个请求。
我在使用 Netty 配置 UDP 服务器时有做错什么吗?或者 Netty 简单地没有设计来支持这样的负载?为什么给定的 Cached Thread Pool 只使用一个线程(通过 JMX jconsole 查看线程名称和输出日志中的内容发现只有一个线程并且始终相同)?我认为如果使用更多的线程,服务器将能够轻松处理这样的负载,因为当我不使用 Netty 时,可以毫无问题地处理它!
以下是我的初始化代码:
...

lChannelfactory = new NioDatagramChannelFactory( Executors.newCachedThreadPool(), nbrWorkers );
lBootstrap = new ConnectionlessBootstrap( lChannelfactory );

lBootstrap.setPipelineFactory( new ChannelPipelineFactory() {
    @Override
    public ChannelPipeline getPipeline()
    {
        ChannelPipeline lChannelPipeline = Channels.pipeline();
        lChannelPipeline.addLast( "Simple UDP Frame Dump DECODER", new SimpleUDPPacketDumpDecoder( null ) );            
        lChannelPipeline.addLast( "Simple UDP Frame Dump HANDLER", new SimpleUDPPacketDumpChannelHandler( lOuterFrameStatsCollector ) );            
        return lChannelPipeline;
    }
} );

bindChannel = lBootstrap.bind( socketAddress );

...

我的解码器中 decode() 方法的内容:

protected Object decode(ChannelHandlerContext iCtx, Channel iChannel, ChannelBuffer iBuffer) throws Exception
{
    ChannelBuffer lDuplicatedChannelBuffer = null;
    sLogger.debug( "Decode method called." );

    if ( iBuffer.readableBytes() < 8 ) return null;
    if ( outerFrameStatsCollector != null ) outerFrameStatsCollector.incrementNbrRequests();

    if ( iBuffer.readable() ) 
    {        
        sLogger.debug( convertToAsciiHex( iBuffer.array(), iBuffer.readableBytes() ) );                     
        lDuplicatedChannelBuffer = ChannelBuffers.dynamicBuffer( iBuffer.readableBytes() );            
        iBuffer.readBytes( lDuplicatedChannelBuffer );
    }

    return lDuplicatedChannelBuffer;
}

我的处理程序的messageReceived()方法中的内容:

public void messageReceived(final ChannelHandlerContext iChannelHandlerContext, final MessageEvent iMessageEvent) throws Exception
{
    ChannelBuffer lMessageBuffer = (ChannelBuffer) iMessageEvent.getMessage();
    if ( outerFrameStatsCollector != null ) outerFrameStatsCollector.incrementNbrRequests();

    if ( lMessageBuffer.readable() ) 
    {        
        sLogger.debug( convertToAsciiHex( lMessageBuffer.array(), lMessageBuffer.readableBytes() ) );            
        lMessageBuffer.discardReadBytes();
    }
}

你知道UDP没有传递保证,对吧? - Chris Shain
是的,我知道没有这样的交付保证,但我的负载测试是在本地完成的,我使用DatagramSocket而不是Netty工具来使用我的简单服务器,并且我目前正在使用WireShark分析请求以验证一种情况中没有丢失任何内容(没有使用Netty),并且使用Netty时数据包会丢失。 - The4Summers
如果您发现Netty丢失了数据包,那么可能是因为Netty没有快速处理传入的数据包,或者您没有及时处理,导致套接字接收缓冲区已满,进而导致传入的数据包被丢弃。您可以加快接收速度或减慢发送速度来解决这个问题。 - user207421
1个回答

7

您没有正确配置 ConnectionlessBootstrap 实例。

  1. You have to configure followings with optimum values.

    SO_SNDBUF size, SO_RCVBUF size and a ReceiveBufferSizePredictorFactory

    lBootstrap.setOption("sendBufferSize", 1048576);
    
    lBootstrap.setOption("receiveBufferSize", 1048576);
    
    lBootstrap.setOption("receiveBufferSizePredictorFactory", 
     new AdaptiveReceiveBufferSizePredictorFactory(MIN_SIZE, INITIAL_SIZE, MAX_SIZE));
    

    check DefaultNioDatagramChannelConfig class for more details.

  2. The pipeline is doing everything using the Netty work thread. If worker thread is overloaded, it will delay the selector event loop execution and there will be a bottleneck in reading/writing the channel. You have to add a execution handler as following in the pipeline. It will free the worker thread to do its own work.

    ChannelPipeline lChannelPipeline = Channels.pipeline();
    
    lChannelPipeline.addFirst("execution-handler", new ExecutionHandler(
      new OrderedMemoryAwareThreadPoolExecutor(16, 1048576, 1048576));
    
    //add rest of the handlers here
    

然而,我仍然不理解为什么在实例化NioDatagramChannelFactory类时必须指定线程池执行器和工作线程数,即使它们从未被使用!!? - The4Summers
很高兴它对你有用。线程池执行器用于在NioDatagramChannel管道中创建工作线程。一个工作线程可以被分配到一个DatagramChannel上进行非阻塞读/写操作。如果您的应用程序正在监听多个端口,则会创建许多工作线程(默认值为cpu大小*2),并以轮询方式分配给DatagramChannel。如果您想要更多的工作线程,可以在NioDatagramChannelFactory构造函数中指定。 - Jestan Nirojan
我还是无法相信Netty应用程序会丢失数据包!你确定你的解码器正常工作吗?为什么不使用Netty提供的帧解码器(FixedLengthFrameDecoder、LengthFieldBasedFrameDecoder等)并在处理程序中计算消息数量呢? - Jestan Nirojan
这行代码正确吗?如果 (iBuffer.readableBytes() < 8) 返回 null; - Jestan Nirojan
最后,添加这些缓冲区大小选项有所帮助,但我仍然会丢失帧。我的实际实现比我在这个问题中展示的示例更复杂。我们已经在UDP上实现了自己的协议,并且我们有一个特殊的解码器来验证我们是否有完整和有效的帧(前导、命令、长度、crc等),并且最小帧长度为8字节(这就是为什么在我的示例中我保留了这个验证)。是的,我可以尝试使用fixedLengthFrameDecoder,因为我知道我的帧的最大大小为1088字节。 - The4Summers
显示剩余2条评论

网页内容由stack overflow 提供, 点击上面的
可以查看英文原文,
原文链接