Netty每个UDP数据报文不同的管道

3
我们有一个已经实现了TCP/IP协议的服务器,但现在我们需要支持UDP协议的要求。
每个发送的UDP数据报都包含我需要解码的所有内容,因此它是一个非常简单的请求和响应系统,数据在数据报中通过换行符分隔。
服务器启动时引导程序的代码如下所示:
    //SETUP UDP SERVER
    DatagramChannelFactory udpFactory = new NioDatagramChannelFactory(Executors.newCachedThreadPool());

    ConnectionlessBootstrap udpBootstrap = new ConnectionlessBootstrap(udpFactory);

    udpBootstrap.setOption("sendBufferSize", 65536);
    udpBootstrap.setOption("receiveBufferSize", 65536);
    udpBootstrap.setOption("receiveBufferSizePredictorFactory", new AdaptiveReceiveBufferSizePredictorFactory());

    udpBootstrap.setOption("broadcast", "true");
    udpBootstrap.setPipelineFactory(new ServerPipeLineFactoryUDP());
    udpBootstrap.bind(new InetSocketAddress(hostIp, 4000)); 

管道代码:

class ServerPipeLineFactoryUDP implements ChannelPipelineFactory
{

    private final static ExecutionHandler EXECUTION_HANDLER = new ExecutionHandler(new OrderedMemoryAwareThreadPoolExecutor(ScorpionFMS.THREAD_POOL_COUNT, 0, 0));

    public ServerPipeLineFactoryUDP()
    {

    }

    @Override
    public ChannelPipeline getPipeline() throws Exception
    {

    ChannelPipeline pipeline = pipeline();
    pipeline.addLast("debugup", new DebugUpstreamHandler("UDP"));
    pipeline.addLast("debugdown", new DebugDownstreamHandler("UDP"));

    pipeline.addLast("framer", new DelimiterBasedFrameDecoder(256, Delimiters.lineDelimiter()));

    pipeline.addLast("decoder", new UDPRequestDecoder(true));
    pipeline.addLast("encoder", new StringEncoder());
    pipeline.addLast("executor", EXECUTION_HANDLER);
    pipeline.addLast("handler", new UDPRequestHandler(

    return pipeline;
    }
}

我遇到的问题是每个数据报都使用相同的管道实例(我希望每个数据报都使用新的管道实例),因此在处理数据报内容时存储的所有状态都会被保存,下一个数据报也会使用它(而对于TCP每个连接都有自己的通道,因此有自己的管道实例和状态)。
我知道这是从文档中阅读到的预期行为,但是否有办法强制Netty为每个数据报重新创建管道?还是我完全错了?
简而言之,我希望每个数据报都有一个新的管道实例(与TCP相同)。

为什么在处理程序中要“存储状态”?针对你所使用的这种消息传递方式,是否需要连接的概念?一开始似乎不需要,但后来似乎改变了想法(如果不需要,为什么要存储它)... - MartinK
2个回答

6

就像我在IRC中说的那样,我认为这可以做到你想要的事情,或者至少给你一些想法。

public class Example {

    public static void main(String[] args) {
        final ChannelPipelineHandlerImpl perDatagramFactory = new ChannelPipelineHandlerImpl();

        DatagramChannelFactory udpFactory = new NioDatagramChannelFactory(Executors.newCachedThreadPool());

        ConnectionlessBootstrap udpBootstrap = new ConnectionlessBootstrap(udpFactory);

        udpBootstrap.setPipelineFactory(new ChannelPipelineFactory() {

            public ChannelPipeline getPipeline() throws Exception {
                return Channels.pipeline(new DistinctChannelPipelineHandler(perDatagramFactory));
            }
        });

    }

    private static final class DistinctChannelPipelineHandler implements ChannelDownstreamHandler, ChannelUpstreamHandler {
        private ChannelPipelineFactory factory;

        public DistinctChannelPipelineHandler(ChannelPipelineFactory factory) {
            this.factory = factory;
        }

        public void handleUpstream(ChannelHandlerContext ctx, ChannelEvent e) throws Exception {
            ChannelPipeline pipeline = factory.getPipeline();
            pipeline.attach(ctx.getChannel(), ctx.getPipeline().getSink());
            pipeline.sendUpstream(e);

            ctx.sendUpstream(e);

        }

        public void handleDownstream(ChannelHandlerContext ctx, ChannelEvent e) throws Exception {
            ChannelPipeline pipeline = factory.getPipeline();
            pipeline.attach(ctx.getChannel(), ctx.getPipeline().getSink());
            pipeline.sendDownstream(e);

            ctx.sendDownstream(e);
        }

    }

    private static final class ChannelPipelineHandlerImpl implements ChannelPipelineFactory {

        public ChannelPipeline getPipeline() throws Exception {
            // Add your handlers here
            return Channels.pipeline();
        }

    }
}

经过一些更改,它正是我所需要的。再次感谢!现在TCP/IP和UDP已经完美地并行运行了。 - Ciaran Fisher
3
在Netty 4中是否有类似的例子?我发现这非常有帮助! - Abe
@norman-maurer,如上评论所要求的那样,您能否在Netty 4中提供一个示例?我发现https://github.com/Shevchik/UdpServerSocketChannel很有用。它还为每个远程地址分配了单独的通道。在Netty 4中采用的方法是UdpServerSocketChannel项目中推荐的方法,还是您建议的其他方法? - garnet

0

我不确定UDP通道是如何处理的,但如果每个数据报的通道都是独立的,你可以将状态存储在ChannelLocal中。


不幸的是,问题在于通道在每个数据报中并不明显,这就是我正在努力实现的。 - Ciaran Fisher

网页内容由stack overflow 提供, 点击上面的
可以查看英文原文,
原文链接