Netty Nio Java中的通信

10

我想在Netty nio中创建一个具有两个客户端和一个服务器的通信系统。更具体地说,当两个客户端与服务器连接时,首先,我希望从服务器发送一条消息,然后能够在两个客户端之间交换数据。我正在使用这个示例提供的代码。我的代码修改可以在这里找到

在serverHandler中,似乎只有当第一个客户端连接时channelRead才能正常工作,因此它总是返回1,但是当第二个客户端连接时,值没有改变为2。如何正确检查服务器何时连接了两个客户端?我该如何从客户端的主要函数动态读取此值?另外,让两个客户端通信的最佳方法是什么?

EDIT1: 显然,客户端服务正在运行并直接关闭,因此每次运行新的NettyClient都会连接,但是连接随后被关闭。因此,计数器始终从零更改为一。如下面的评论所建议的那样,我使用telnet在同一端口上进行了测试,计数器似乎正常增加,然而,使用NettyClient服务时并不是这样。

EDIT2: 问题似乎出在ProcessingHandler类中的channelRead中的future.addListener(ChannelFutureListener.CLOSE);。当我将其注释掉后,似乎代码可以正常工作。但是,我不确定注释掉它的后果。此外,我希望从客户端的主要函数检查返回消息是否为特定的两个。我该如何创建一种方法,等待来自服务器的特定消息,同时阻止主要功能?

 static EventLoopGroup workerGroup = new NioEventLoopGroup();
 static Promise<Object> promise = workerGroup.next().newPromise(); 
 public static void callClient() throws Exception {
    String host = "localhost";
    int port = 8080;
    try {
        Bootstrap b = new Bootstrap();
        b.group(workerGroup);
        b.channel(NioSocketChannel.class);
        b.option(ChannelOption.SO_KEEPALIVE, true);
        b.handler(new ChannelInitializer<SocketChannel>() {
            @Override
            public void initChannel(SocketChannel ch) throws Exception {
                ch.pipeline().addLast(new RequestDataEncoder(), new ResponseDataDecoder(), new ClientHandler(promise));
            }
        });
        ChannelFuture f = b.connect(host, port).sync();
    } finally {
        //workerGroup.shutdownGracefully();
    }
}

我想在主函数中调用该方法并返回结果,当结果为2时,继续进行主要功能。但是,由于它将多次运行相同的客户端,因此无法在while内部调用callClient。

   callBack();
    while (true) {
        Object msg = promise.get();
        System.out.println("Case1: the connected clients is not two");
        int ret = Integer.parseInt(msg.toString());
        if (ret == 2){
            break;
        }
    }
   System.out.println("Case2: the connected clients is two");
   // proceed with the main functionality

如何更新第一个客户端的Promise变量。当我运行两个客户端时,对于第一个客户端,我始终收到消息:

Case1:连接的客户端不是两个

似乎Promise没有正常更新,而对于第二个客户端,我总是收到:

Case2:连接的客户端是两个


{btsdaf} - Tarun Lalwani
https://github.com/kristosh/netty-nio-Client-Server - Jose Ramon
{btsdaf} - Jose Ramon
{btsdaf} - Jose Ramon
2个回答

2
如果我没记错的话,ChannelHandlerContext是每个通道一个,并且它可以在其管道中有多个ChannelHandlers。您的channels变量是处理程序类的实例变量。对于每个连接,您都会创建一个新的ProcessingHandler实例。因此,一旦初始化,每个连接在channels变量中将仅有一个 - 它被创建用于的那个连接。
请参见服务器代码(NettyServer.java)中initChannel函数中的new ProcessingHandler()。
您可以使channels变量静态,以便在ProcessingHandler实例之间共享。或者您可以在其他地方创建单个ProcessingHandler实例(例如,在run()函数中作为局部变量),然后将该实例传递给addLast调用,而不是new ProcessingHandler()。

事实上,在您目前的代码中,ctx.close() 关闭了客户端的套接字(或者更确切地说,启动了关闭过程),然后 f.channel().closeFuture().sync() 在关闭过程完成后被通知,只有在此之后 workerGroup.shutdownGracefully() 才会被调用(到那时,应用程序中仅剩的套接字已经被关闭,因此已经没有什么需要关闭的了)。 - Seva
那么肯定会有更多的错误。计数是1,因为您从未同时连接超过1个客户端。对于telnet,请在命令行中运行“telnet localhost 8080”。它无法与您的服务器正常通信,但它将连接并显示通道中的连接计数。 - Seva
我认为我找到了原因,就是在processingHandler(服务器端)中的future.addListener(ChannelFutureListener.CLOSE)。如果我将其注释掉会发生什么? - Jose Ramon
如果客户端一直保持连接,对我来说会有问题吗?实际上我想要做的是在两个客户端之间建立通信并玩一个JavaFX游戏(多人游戏场景)。因此,我首先想要从服务器接收到两个客户端已连接的消息,然后在客户端之间交换信息,并最终关闭连接。所以我想在游戏结束时关闭连接。 - Jose Ramon
顺便提一下,我将NettyClient中的代码添加到一个名为callClient的方法中,该方法返回channelGroup的大小,并且它正常工作。但是,我想从主函数中调用此函数,在while循环内阻止代码的功能,并在返回两个通道数时继续主函数的操作。如何异步检查通道的大小。 - Jose Ramon
显示剩余12条评论

2
为什么ChannelGroup通道的大小始终为1,即使我连接了更多的客户端?
因为每个新的Channel(客户端)都会调用子ChannelInitializer。在那里,您正在创建ProcessingHandler的新实例,因此每个通道都看到自己的ChannelGroup实例。
解决方案1 - Channel Attribute 使用Attribute并将其与Channel关联起来。
在某个地方创建属性(比如Constants类):
public static final AttributeKey<ChannelGroup> CH_GRP_ATTR = 
       AttributeKey.valueOf(SomeClass.class.getName());

现在创建一个 ChannelGroup,所有 ProcessingHandler 实例都将使用它:
final ChannelGroup channels = new DefaultChannelGroup(GlobalEventExecutor.INSTANCE);

在NettyServer中更新您的子ChannelInitializer
@Override
public void initChannel(SocketChannel ch) throws Exception {
    ch.pipeline().addLast(
        new RequestDecoder(), 
        new ResponseDataEncoder(), 
        new ProcessingHandler());

    ch.attr(Constants.CH_GRP_ATTR).set(channels);
}

现在您可以像这样在处理程序中访问 ChannelGroup 实例:
@Override
public void channelActive(ChannelHandlerContext ctx) throws Exception {
    final ChannelGroup channels = ctx.channel().attr(Constants.CH_GRP_ATTR).get();
    channels.add(ctx.channel());

这将起作用,因为每次新客户端连接时,ChannelInitializer 将使用对 ChannelGroup 的相同引用进行调用。
解决方案 2 - 静态字段
如果您将 ChannelGroup 声明为静态的,则所有类实例都将“看到”同一个 ChannelGroup 实例:
private static final ChannelGroup channels =
     new DefaultChannelGroup(GlobalEventExecutor.INSTANCE);

解决方案三:传播共享实例
ProcessingHandler 的构造函数中引入参数:
private final ChannelGroup channels;
public ProcessingHandler(ChannelGroup chg) {
    this.channels = chg;
}

现在,在您的NettyServer类中创建ChannelGroup实例,并将其传递给ProcessingHandler构造函数:
final ChannelGroup channels = new 
      DefaultChannelGroup(GlobalEventExecutor.INSTANCE);

@Override
public void initChannel(SocketChannel ch) throws Exception {
    ch.pipeline().addLast(
        new RequestDecoder(), 
        new ResponseDataEncoder(), 
        new ProcessingHandler(channels)); // <- here
}

个人而言,我会选择第一种解决方案,因为:
  • 它能够清晰地将 ChannelGroup 与 Channel 上下文关联起来
  • 您可以在其他处理程序中访问相同的 ChannelGroup
  • 您可以拥有多个服务器实例(在同一个 JVM 中运行,但在不同的端口上)

{btsdaf} - Jose Ramon
{btsdaf} - rkosegi
@JoseRamon:如果“add”被调用两次,通道大小为1在正常情况下是不可能的。你能确认一下吗(例如通过调试器)? - rkosegi
{btsdaf} - Jose Ramon
{btsdaf} - rkosegi
显示剩余6条评论

网页内容由stack overflow 提供, 点击上面的
可以查看英文原文,
原文链接