我使用 Netty 写了一个简单的 UDP 服务器,它会将接收到的消息(帧)打印在日志中。为此,我创建了一个简单的帧解码器和一个简单的消息处理程序。我还编写了一个客户端,可以按顺序和/或并行发送多个请求。
当我配置我的客户端测试器来连续发送几百个请求,并且它们之间有小延迟时,我的使用 Netty 编写的服务器可以正确地接收它们。但是,当我增加客户端中同时发送的请求数量(例如100),并与一些重复和顺序请求配合使用时,我的服务器开始丢失许多请求。例如,当我发送50000个请求时,仅使用简单的 ChannelHandler 打印出接收到的消息时,我的服务器仅能接收大约49000个请求。
当我在这个处理程序前添加简单的帧解码器(用于打印出帧并将其复制到另一个缓冲区)时,服务器仅处理了一半的请求!
我注意到,无论我指定了多少个工作线程给创建的 NioDatagramChannelFactory,始终只有一个线程处理请求(我使用了推荐的 Executors.newCachedThreadPool() 作为其他参数)。
我还创建了另一个类似的简单 UDP 服务器,基于 JDK 提供的 DatagramSocket,它完美地处理了每个请求,没有任何丢失!当我在客户端发送50000个请求(例如使用1000个线程)时,我的服务器收到了50000个请求。
我在使用 Netty 配置 UDP 服务器时有做错什么吗?或者 Netty 简单地没有设计来支持这样的负载?为什么给定的 Cached Thread Pool 只使用一个线程(通过 JMX jconsole 查看线程名称和输出日志中的内容发现只有一个线程并且始终相同)?我认为如果使用更多的线程,服务器将能够轻松处理这样的负载,因为当我不使用 Netty 时,可以毫无问题地处理它!
以下是我的初始化代码:
当我配置我的客户端测试器来连续发送几百个请求,并且它们之间有小延迟时,我的使用 Netty 编写的服务器可以正确地接收它们。但是,当我增加客户端中同时发送的请求数量(例如100),并与一些重复和顺序请求配合使用时,我的服务器开始丢失许多请求。例如,当我发送50000个请求时,仅使用简单的 ChannelHandler 打印出接收到的消息时,我的服务器仅能接收大约49000个请求。
当我在这个处理程序前添加简单的帧解码器(用于打印出帧并将其复制到另一个缓冲区)时,服务器仅处理了一半的请求!
我注意到,无论我指定了多少个工作线程给创建的 NioDatagramChannelFactory,始终只有一个线程处理请求(我使用了推荐的 Executors.newCachedThreadPool() 作为其他参数)。
我还创建了另一个类似的简单 UDP 服务器,基于 JDK 提供的 DatagramSocket,它完美地处理了每个请求,没有任何丢失!当我在客户端发送50000个请求(例如使用1000个线程)时,我的服务器收到了50000个请求。
我在使用 Netty 配置 UDP 服务器时有做错什么吗?或者 Netty 简单地没有设计来支持这样的负载?为什么给定的 Cached Thread Pool 只使用一个线程(通过 JMX jconsole 查看线程名称和输出日志中的内容发现只有一个线程并且始终相同)?我认为如果使用更多的线程,服务器将能够轻松处理这样的负载,因为当我不使用 Netty 时,可以毫无问题地处理它!
以下是我的初始化代码:
...
lChannelfactory = new NioDatagramChannelFactory( Executors.newCachedThreadPool(), nbrWorkers );
lBootstrap = new ConnectionlessBootstrap( lChannelfactory );
lBootstrap.setPipelineFactory( new ChannelPipelineFactory() {
@Override
public ChannelPipeline getPipeline()
{
ChannelPipeline lChannelPipeline = Channels.pipeline();
lChannelPipeline.addLast( "Simple UDP Frame Dump DECODER", new SimpleUDPPacketDumpDecoder( null ) );
lChannelPipeline.addLast( "Simple UDP Frame Dump HANDLER", new SimpleUDPPacketDumpChannelHandler( lOuterFrameStatsCollector ) );
return lChannelPipeline;
}
} );
bindChannel = lBootstrap.bind( socketAddress );
...
我的解码器中 decode() 方法的内容:
protected Object decode(ChannelHandlerContext iCtx, Channel iChannel, ChannelBuffer iBuffer) throws Exception
{
ChannelBuffer lDuplicatedChannelBuffer = null;
sLogger.debug( "Decode method called." );
if ( iBuffer.readableBytes() < 8 ) return null;
if ( outerFrameStatsCollector != null ) outerFrameStatsCollector.incrementNbrRequests();
if ( iBuffer.readable() )
{
sLogger.debug( convertToAsciiHex( iBuffer.array(), iBuffer.readableBytes() ) );
lDuplicatedChannelBuffer = ChannelBuffers.dynamicBuffer( iBuffer.readableBytes() );
iBuffer.readBytes( lDuplicatedChannelBuffer );
}
return lDuplicatedChannelBuffer;
}
我的处理程序的messageReceived()方法中的内容:
public void messageReceived(final ChannelHandlerContext iChannelHandlerContext, final MessageEvent iMessageEvent) throws Exception
{
ChannelBuffer lMessageBuffer = (ChannelBuffer) iMessageEvent.getMessage();
if ( outerFrameStatsCollector != null ) outerFrameStatsCollector.incrementNbrRequests();
if ( lMessageBuffer.readable() )
{
sLogger.debug( convertToAsciiHex( lMessageBuffer.array(), lMessageBuffer.readableBytes() ) );
lMessageBuffer.discardReadBytes();
}
}