使用Netty实现基于WebSockets的MQTT?

5

我希望使用基于Websockets的MQTT协议。在Netty中,使用Websockets非常容易:

ChannelPipeline pipeline = ch.pipeline();
pipeline.addLast("codec-http", new HttpServerCodec());
pipeline.addLast("aggregator", new HttpObjectAggregator(65536));
pipeline.addLast("handler", new WebSocketServerHandler());

我找到了一个基于Netty的MQTT代理(moquette)。
 NettyMQTTHandler handler = new NettyMQTTHandler();
 ServerBootstrap b = new ServerBootstrap();
            b.group(m_bossGroup, m_workerGroup)
             .channel(NioServerSocketChannel.class) 
             .childHandler(new ChannelInitializer<SocketChannel>() { 
                 @Override
                 public void initChannel(SocketChannel ch) throws Exception {
                    ChannelPipeline pipeline = ch.pipeline();
                    //pipeline.addFirst("metrics", new BytesMetricsHandler(m_metricsCollector));
                    pipeline.addFirst("idleStateHandler", new IdleStateHandler(0, 0, Constants.DEFAULT_CONNECT_TIMEOUT));
                    pipeline.addAfter("idleStateHandler", "idleEventHandler", new MoquetteIdleTimoutHandler());
                    //pipeline.addLast("logger", new LoggingHandler("Netty", LogLevel.ERROR));
                    pipeline.addLast("decoder", new MQTTDecoder());
                    pipeline.addLast("encoder", new MQTTEncoder());
                    pipeline.addLast("metrics", new MessageMetricsHandler(m_metricsCollector));
                    pipeline.addLast("handler", handler);
                 }
             })
             .option(ChannelOption.SO_BACKLOG, 128)
             .option(ChannelOption.SO_REUSEADDR, true)
             .childOption(ChannelOption.SO_KEEPALIVE, true); 

根据理论,我应该能够通过Websocket发送MQTT,但我不知道在Netty上是否可行?有没有人有任何线索或想法如何做到这一点?我应该使用MessageToMessageCodec和BinaryWebSocketFrame吗?

干杯!

1个回答

7
假设你的MQTTDecoder消耗ByteBuf并生成一些MQTT消息对象,而MQTTEncoder则相反,这通常是情况。
然后,您的编解码器使用的ByteBuf不是WebSocket消息。它们需要成为Web Socket帧的有效负载。我会将以下处理程序插入到管道中:
1. 一个MessageToMessageDecoder,将WebSocket文本(或二进制)帧转换为ByteBuf,以便MQTTDecoder可以使用它。转换非常简单-只需获取Web Socket帧的内容。 2. 一个MessageToMessageEncoder,将ByteBuf转换为Web Socket文本(或二进制)帧,以便Netty的WebSocketFrameEncoder可以使用它。转换也非常简单-只需使用MQTTEncoder编码的ByteBuf包装一个Web Socket帧对象。
最终的管道将如下所示:
1. HttpResponseEncoder 2. HttpRequestDecoder 3. HttpObjectAggregator(65536) 4. WebSocketServerProtocolHandler("/your-websocket-endpoint-path") 5. WebSocketFrameToByteBufDecoder扩展自MessageToMessageDecoder 6. ByteBufToWebSocketFrameEncoder扩展自MessageToMessageEncoder 7. MQTTEncoder 8. MQTTDecoder 9. MessageMetricsHandler 10. 处理程序
WebSocketServerProtocolHandler将对您的Web套接字客户端执行必要的握手,并在WebSocketFrameToByteBufDecoder之前插入WebSocketFrameEncoder和WebSocketFrameDecoder。成功握手后的结果管道如下所示:
1. WebSocketFrameEncoder 2. WebSocketFrameDecoder 3. WebSocketFrameToByteBufDecoder扩展自MessageToMessageDecoder 4. ByteBufToWebSocketFrameEncoder扩展自MessageToMessageEncoder 5. MQTTEncoder 6. MQTTDecoder 7. MessageMetricsHandler 8. 处理程序

谢谢你的提示。我只是将MQTTDecoder和MQTTEncoder更改为MessageToMessageDecoder / MessageToMessageEncoder,现在它可以完美运行了 ;-)代码:https://code.google.com/p/moquette-mqtt/issues/attachmentText?id=37&aid=370006000&name=NettyAcceptor.java&token=mb3cYVwbJqfgqAU19vC3QjEqjIw%3A1395163623412 - radzio
是的,如果您对编解码器实现有完全控制权,甚至可以这样做。 - trustin

网页内容由stack overflow 提供, 点击上面的
可以查看英文原文,
原文链接