使用WriteTimeoutHandler在Netty中实现保持连接消息

12
我正在使用Netty 3.2.7。我试图在客户端中编写功能,以便在一定时间(例如30秒)后没有写入任何消息时向服务器发送“保持活动”消息。
经过一些研究,我发现WriteTimeoutHandler应该让我做到这一点。我在这里找到了解释: https://issues.jboss.org/browse/NETTY-79
Netty文档中给出的示例是:
public ChannelPipeline getPipeline() {
     // An example configuration that implements 30-second write timeout:
     return Channels.pipeline(
         new WriteTimeoutHandler(timer, 30), // timer must be shared.
         new MyHandler());
 }

在我的测试客户端中,我已经完成了这个任务。在MyHandler中,我还重写了exceptionCaught()方法:

public void exceptionCaught(ChannelHandlerContext ctx, ExceptionEvent e) {
    if (e.getCause() instanceof WriteTimeoutException) {
        log.info("Client sending keep alive!");
        ChannelBuffer keepAlive = ChannelBuffers.buffer(KEEP_ALIVE_MSG_STR.length());
        keepAlive.writeBytes(KEEP_ALIVE_MSG_STR.getBytes());
        Channels.write(ctx, Channels.future(e.getChannel()), keepAlive);
    }
}
无论客户端通道多久没有写入任何内容,我已经重写的exceptionCaught()方法都不会被调用。
查看WriteTimeoutHandler的源代码,它的writeRequested()实现如下:
public void writeRequested(ChannelHandlerContext ctx, MessageEvent e)
        throws Exception {

    long timeoutMillis = getTimeoutMillis(e);
    if (timeoutMillis > 0) {
        // Set timeout only when getTimeoutMillis() returns a positive value.
        ChannelFuture future = e.getFuture();
        final Timeout timeout = timer.newTimeout(
                new WriteTimeoutTask(ctx, future),
                timeoutMillis, TimeUnit.MILLISECONDS);

        future.addListener(new TimeoutCanceller(timeout));
    }

    super.writeRequested(ctx, e);
}

在这里,实现的意思似乎是:“当请求写入时,创建一个新的超时。当写入成功时,取消超时。”

使用调试器,看起来确实是这样的情况。一旦写入完成,超时就被取消了。但这不是我想要的行为。我想要的行为是:“如果客户端在30秒内没有向通道写入任何信息,则抛出WriteTimeoutException异常。”

那么,不是应该使用WriteTimeoutHandler实现这个功能吗?从我在网上读到的资料来看,我是这样解释它的,但实现似乎并不是这样工作的。我用错了吗?还是应该使用其他东西?在我们正在尝试重写的Mina版本中,我发现sessionIdle()方法被覆盖以实现我想要的行为,但在Netty中,这种方法是不可用的。

2个回答

14

对于 Netty 4.0 及更高版本,您应该像IdleStateHandler文档中的示例一样扩展ChannelDuplexHandler

 // An example that sends a ping message when there is no outbound traffic
 // for 30 seconds.  The connection is closed when there is no inbound traffic
 // for 60 seconds.

 public class MyChannelInitializer extends ChannelInitializer<Channel> {
     @Override
     public void initChannel(Channel channel) {
         channel.pipeline().addLast("idleStateHandler", new IdleStateHandler(60, 30, 0));
         channel.pipeline().addLast("myHandler", new MyHandler());
     }
 }

 // Handler should handle the IdleStateEvent triggered by IdleStateHandler.
 public class MyHandler extends ChannelDuplexHandler {
     @Override
     public void userEventTriggered(ChannelHandlerContext ctx, Object evt) throws Exception {
         if (evt instanceof IdleStateEvent) {
             IdleStateEvent e = (IdleStateEvent) evt;
             if (e.state() == IdleState.READER_IDLE) {
                 ctx.close();
             } else if (e.state() == IdleState.WRITER_IDLE) {
                 ctx.writeAndFlush(new PingMessage());
             }
         }
     }
 }

8
我建议添加IdleStateHandler,然后添加您的自定义实现IdleStateAwareUpstreamHandler来响应空闲状态。在许多不同的项目中,这对我非常有效。

Javadoc列出了以下示例,您可以将其用作实现的基础:

public class MyPipelineFactory implements ChannelPipelineFactory {

    private final Timer timer;
    private final ChannelHandler idleStateHandler;

    public MyPipelineFactory(Timer timer) {
        this.timer = timer;
        this.idleStateHandler = new IdleStateHandler(timer, 60, 30, 0);
        // timer must be shared.
    }

    public ChannelPipeline getPipeline() {
        return Channels.pipeline(
            idleStateHandler,
            new MyHandler());
    }
}

// Handler should handle the IdleStateEvent triggered by IdleStateHandler.
public class MyHandler extends IdleStateAwareChannelHandler {

    @Override
    public void channelIdle(ChannelHandlerContext ctx, IdleStateEvent e) {
        if (e.getState() == IdleState.READER_IDLE) {
            e.getChannel().close();
        } else if (e.getState() == IdleState.WRITER_IDLE) {
            e.getChannel().write(new PingMessage());
        }
    }
}

ServerBootstrap bootstrap = ...;
Timer timer = new HashedWheelTimer();
...
bootstrap.setPipelineFactory(new MyPipelineFactory(timer));
...

我能够在不到十分钟内按照您建议的更改,并且它完美地运行。谢谢,先生! - ImmuneEntity

网页内容由stack overflow 提供, 点击上面的
可以查看英文原文,
原文链接