Netty Nio中的Promise异步更新

10

我有一个服务器和客户端架构,它们交换信息。我想从服务器返回连接通道的数量。我想使用Promise将服务器的消息返回给客户端。我的代码如下:

public static void callBack () throws Exception{

   String host = "localhost";
   int port = 8080;

   try {
       Bootstrap b = new Bootstrap();
       b.group(workerGroup);
       b.channel(NioSocketChannel.class);
       b.option(ChannelOption.SO_KEEPALIVE, true);
       b.handler(new ChannelInitializer<SocketChannel>() {
        @Override
           public void initChannel(SocketChannel ch) throws Exception {
            ch.pipeline().addLast(new RequestDataEncoder(), new ResponseDataDecoder(), new ClientHandler(promise));
           }
       });
       ChannelFuture f = b.connect(host, port).sync();
       //f.channel().closeFuture().sync();
   }
   finally {
    //workerGroup.shutdownGracefully();
   }
}

public static void main(String[] args) throws Exception {

  callBack();
  while (true) {

    Object msg = promise.get();
    System.out.println("The number if the connected clients is not two");
    int ret = Integer.parseInt(msg.toString());
    if (ret == 2){
        break;
    }
  }
  System.out.println("The number if the connected clients is two");
}

当我运行一个客户端时,它总是接收到消息The number if the connected clients is not two并且返回的数字总是一个。当我运行第二个客户端时,它总是作为返回值接收到两个,但是第一个客户端仍然接收到一个。我无法找到更新第一个客户端的承诺的正确方法。

编辑: 客户端服务器:

public class ClientHandler extends ChannelInboundHandlerAdapter {
  public final Promise<Object> promise;
  public ClientHandler(Promise<Object> promise) {
      this.promise = promise;
  }

  @Override
  public void channelActive(ChannelHandlerContext ctx) throws Exception {
      RequestData msg = new RequestData();
      msg.setIntValue(123);
      msg.setStringValue("all work and no play makes jack a dull boy");
      ctx.writeAndFlush(msg);
  }

  @Override
  public void channelRead(ChannelHandlerContext ctx, Object msg) throws Exception {
      System.out.println(msg);
      promise.trySuccess(msg);
  }
} 

客户端处理程序从服务器接收到的消息存储到Promise中的代码。


当你说“promise”时,你是指非阻塞吗? - rohitmohta
我的意思是这个对象 msg = promise.get();,这个值包含了服务器返回的消息。 - konstantin
你可以关注我的回答,它适用于不同的问题 https://dev59.com/N1YN5IYBdhLWcg3w07Bx#46925540 - rkosegi
@konstantin 根据你的问题,你的意思是:promise.get() 返回连接到你的服务器的客户端(频道)数量? - rohitmohta
在客户端处理程序中,我正在存储从客户端读取的消息。https://github.com/kristosh/netty-nio-Client-Server - konstantin
一个 Promise 只能被更新一次。 - Ferrybig
1个回答

7
使用Netty框架,PromiseFuture是只写对象,这个原则使它们在多线程环境中更易于使用。
由于Promise不能实现您想要的功能,我们需要查看其他技术是否适合您的条件,您的条件基本上可以归结为:
从多个线程读取 仅从单个线程写入(因为在Netty通道内部,除非将通道标记为可共享,否则读取方法只能由1个线程同时执行) 对于这些要求,最合适的匹配是volatile变量,因为它对于读取是线程安全的,并且可以安全地由1个线程更新而不必担心写入顺序。
要更新您的代码以使用volatile变量,需要进行一些修改,因为我们无法轻松地将变量的引用链接传递到函数内部,但我们必须传递一个更新后端变量的函数。
private static volatile int connectedClients = 0;
public static void callBack () throws Exception{
    //....
           ch.pipeline().addLast(new RequestDataEncoder(), new ResponseDataDecoder(),
                                 new ClientHandler(i -> {connectedClients = i;});
    //....
}

public static void main(String[] args) throws Exception {

  callBack();
  while (true) {
    System.out.println("The number if the connected clients is not two");
    int ret = connectedClients;
    if (ret == 2){
        break;
    }
  }
  System.out.println("The number if the connected clients is two");
}

public class ClientHandler extends ChannelInboundHandlerAdapter {
  public final IntConsumer update;
  public ClientHandler(IntConsumer update) {
      this.update = update;
  }

  @Override
  public void channelActive(ChannelHandlerContext ctx) throws Exception {
      RequestData msg = new RequestData();
      msg.setIntValue(123);
      msg.setStringValue("all work and no play makes jack a dull boy");
      ctx.writeAndFlush(msg);
  }

  @Override
  public void channelRead(ChannelHandlerContext ctx, Object msg) throws Exception {
      System.out.println(msg);
      update.accept(Integer.parseInt(msg));
  }
} 

虽然上述方法应该可以工作,但我们很快发现主类中的while循环使用了大量CPU时间,这可能会影响本地客户端系统的其他部分,幸运的是,如果我们向系统添加其他部分,即同步,这个问题也可以解决。通过将对connectedClients的初始读取留在同步块之外,我们仍然可以从快速读取中获益,在“true”情况下,而在“false”情况下,我们可以安全地节省重要的CPU周期,这些周期可以用于系统的其他部分。
为解决这个问题,在读取时我们采用以下步骤:
  1. connectedClients的值存储在一个单独的变量中
  2. 将此变量与目标值进行比较
  3. 如果为true,则尽早跳出循环
  4. 如果为false,则进入同步块
  5. 开始一个while true循环
  6. 再次读取变量,因为现在值可能已经改变
  7. 检查条件,如果条件正确则退出循环
  8. 如果不是,则等待值的更改

写入时采用以下步骤:

  1. 同步
  2. 更新该值
  3. 唤醒所有等待此值的其他线程

以下代码可以实现此操作:

private static volatile int connectedClients = 0;
private static final Object lock = new Object();
public static void callBack () throws Exception{
    //....
           ch.pipeline().addLast(new RequestDataEncoder(), new ResponseDataDecoder(),
                                 new ClientHandler(i -> {
               synchronized (lock) {
                   connectedClients = i;
                   lock.notifyAll();
               }
           });
    //....
}

public static void main(String[] args) throws Exception {

  callBack();
  int connected = connectedClients;
  if (connected != 2) {
      System.out.println("The number if the connected clients is not two before locking");
      synchronized (lock) {
          while (true) {
              connected = connectedClients;
              if (connected == 2)
                  break;
              System.out.println("The number if the connected clients is not two");
              lock.wait();
          }
      }
  }
  System.out.println("The number if the connected clients is two: " + connected );
}

服务器端更改

然而,并非所有的问题都与客户端有关。

由于您发布了一个指向 GitHub 存储库的链接,当新用户加入时,您从服务器未向旧客户端发送请求。因为这没有完成,客户端永远不会收到有关更改的通知,请确保也执行此操作。


更新后的代码可以在这里找到:https://github.com/kristosh/netty-nio-Client-Server - konstantin
那可能是我在你原始代码中错过的东西,在CLientHandler内部,不要使用update.accept(Integer.parseInt(msg));,而是使用update.accept(((RequestData)msg).getIntValue());,这将直接从你的请求数据对象中提取大小。 - Ferrybig
嗯,GitHub上的代码没有正确更新。事实上,处理程序返回通道数(测试代码而非GitHub上的代码)。发生的情况是第一个客户端正在接收以下消息: - konstantin
连接的客户端数量在锁定之前不是两个 连接的客户端数量不是两个 1 连接的客户端数量不是两个 - konstantin
为什么 int connected = connectedClients; 对于第一个客户端始终是1? - konstantin
显示剩余32条评论

网页内容由stack overflow 提供, 点击上面的
可以查看英文原文,
原文链接