Netty中连接关闭后重新连接的最佳方法是什么?

9
简单场景:
  1. A lower level class A that extends SimpleChannelUpstreamHandler. This class is the workhorse to send the message and received the response.
  2. A top level class B that can be used by other part of the system to send and receive message (can simulate both Synchronous and Asynchronous). This class creates the ClientBootstrap, set the pipeline factory, invoke the bootstrap.connect() and eventually get a handle/reference of the class A through which to be used to send and receive message. Something like:

    ChannelFuture future = bootstrap.connect();
    Channel channel = future.awaitUninterruptibly().getChannel();
    

    A handler = channel.getPipeline().get(A.class);

我知道在A类中,我可以重写public void channelClosed(ChannelHandlerContext ctx, ChannelStateEvent e)方法,这样当远程服务器挂掉时,我就能得到通知。

由于在通道关闭后,在B类中的原始A类引用(上面的处理程序)将不再有效,因此我需要用新引用替换它。

理想情况下,我希望A类有一个机制,在上述覆盖的channelClosed方法中通知B类,以便可以在B类中再次调用bootstrap.connect。实现这一点的一种方法是在A类中引用B类。为此,我需要将B类的引用传递给PipelineFactory,然后由PipelineFactory将B的引用传递给A。

是否有其他更简单的方法来实现同样的效果?

谢谢,

3个回答

19

Channel.closeFuture() 返回一个ChannelFuture,该对象将通知您通道何时关闭。在B中,您可以向该future添加一个ChannelFutureListener,以便在那里进行另一个连接尝试。

您可能希望重复执行此操作,直到最终连接尝试成功:

private void doConnect() {
    Bootstrap b = ...;
    b.connect().addListener((ChannelFuture f) -> {
        if (!f.isSuccess()) {
            long nextRetryDelay = nextRetryDelay(...);
            f.channel().eventLoop().schedule(nextRetryDelay, ..., () -> {
                doConnect();
            }); // or you can give up at some point by just doing nothing.
        }
    });
}

3
除了使用多于1个线程的NioEventLoopGroup时,一切都很好。每次调度时都会创建一个新的工作线程(最多达到我的池中的最大值),旧的工作线程没有被处理掉,有任何想法为什么会这样? - Alexandre Pauzies
1
我有过类似的结果(即每次重新连接尝试时都会创建一个新的工作线程)。你是否确定了处理此问题的适当方法? - Brandon E Taylor
我也遇到了同样的问题。@Brandon E Taylor,你找到解决方案了吗? - StasKolodyuk

0

我不确定这是否是正确的解决方案,但为了修复Trustin的方案中的线程泄漏问题,我发现可以在调度程序触发后关闭事件循环:

final EventLoop eventloop = f.channel().eventLoop();
b.connect().addListener((ChannelFuture f) -> {
    if (!f.isSuccess()) {
        long nextRetryDelay = nextRetryDelay(...);
        eventloop.schedule(() -> {
            doConnect();
            eventloop.shutdownGracefully();
        }, nextRetryDelay, ...);
    }
});

这段代码不起作用。要从通道获取事件循环,必须先进行连接。 - undefined

-1
这是另一个版本,将重新连接行为封装在一个小的辅助类中。
Bootstrap clientBootstrap...
EventLoopGroup group = new NioEventLoopGroup();

Session session = new Session(clientBootstrap,group);
Disposable shutdownHook = session.start();    

interface Disposable {
   void dispose();
}
class Session implements Disposable{    
    private final EventLoopGroup scheduler;
    private final Bootstrap clientBootstrap;

    private int reconnectDelayMs;
    private Channel activeChannel;
    private AtomicBoolean shouldReconnect;

    private Session(Bootstrap clientBootstrap, EventLoopGroup scheduler) {
        this.scheduler = scheduler;
        this.clientBootstrap = clientBootstrap;
        this.reconnectDelayMs = 1;
        this.shouldReconnect = new AtomicBoolean(true);
    }

    public Disposable start(){
        //Create a new connectFuture
        ChannelFuture connectFuture = clientBootstrap.connect();

        connectFuture.addListeners( (ChannelFuture cf)->{
            if(cf.isSuccess()){
                L.info("Connection established");
                reconnectDelayMs =1;                    
                activeChannel = cf.channel();

                //Listen to the channel closing
                var closeFuture =activeChannel.closeFuture();
                closeFuture.addListeners( (ChannelFuture closeFut)->{
                    if(shouldReconnect.get()) {
                        activeChannel.eventLoop().schedule(this::start, nextReconnectDelay(), TimeUnit.MILLISECONDS);
                    }
                    else{
                        L.info("Session has been disposed won't reconnect");
                    }
                });
            }
            else{
                int delay =nextReconnectDelay();
                L.info("Connection failed will re-attempt in {} ms",delay);
                cf.channel().eventLoop().schedule(this::start,delay , TimeUnit.MILLISECONDS);
            }
        });
        
        return this;
    }

    /**
     * Call this to end the session
     */
    @Override
    public void dispose() {
        try {
            shouldReconnect.set(false);
            scheduler.shutdownGracefully().sync();
            if(activeChannel !=null) {
                activeChannel.closeFuture().sync();
            }
        }catch(InterruptedException e){
            L.warn("Interrupted while shutting down TcpClient");
        }
    }

    private int nextReconnectDelay(){
        this.reconnectDelayMs = this.reconnectDelayMs*2;
        return Math.min(this.reconnectDelayMs, 5000);
    }
}

这段代码不起作用。你无法从失败的通道中获取事件循环。 - undefined

网页内容由stack overflow 提供, 点击上面的
可以查看英文原文,
原文链接