NIO选择器:如何在选择时正确注册新通道

12

我有一个继承了 Thread 的子类,它有一个私有的 Selector 和一个公共的 register(SelectableChannel channel, ...) 方法,允许其他线程将通道注册到选择器中。

正如这里所回答的,当选择器的 select() / select(long timeout) 被调用时,通道的 register() 会被阻塞,因此我们需要使用 wakeup() 来唤醒选择器。

我的线程一直执行选择操作(除非它被中断),并且实际上在通道的 register() 被调用之前就已经进入了下一个选择操作。因此,我想使用简单的锁和 synchronized 块来确保 register() 首先执行。

代码如下:(为了可读性删除了不相关的代码)

public class SelectorThread extends Thread {
  ...

  public void register(SelectableChannel channel, Attachment attachment) throws IOException {
    channel.configureBlocking(false);
    synchronized (this) { // LOCKING OCCURS HERE
      selector.wakeup();
      channel.register(selector,
                       SelectionKey.OP_READ,
                       attachment);
    }
  }

  @Override
  public void run() {
    int ready;
    Set<SelectionKey> readyKeys;
    while (!isInterrupted()) {
      synchronized (this) {} // LOCKING OCCURS HERE

      try {
        ready = selector.select(5000);
      } catch (IOException e) {
        e.printStackTrace();
        continue;
      }

      if (ready == 0) {
        continue;
      }

      readyKeys = selector.selectedKeys();

      for (SelectionKey key : readyKeys) {
        readyKeys.remove(key);

        if (!key.isValid()) {
          continue;
        }

        if (key.isReadable()) {
          ...
        }
      }
    }
  }
}

这个简单的锁允许在线程继续下一个选择循环之前进行register()。据我测试,这个方法是可行的。

问题: 这是一种“好”的方式吗?还是有什么严重的缺点?使用List或Queue(如此处所建议)来存储通道以进行注册,或者使用类似这个更复杂的锁会更好吗?这样做的优缺点是什么?还有任何“更好”的方法吗?


4个回答

5

请像Darron建议的那样,把Selector等工具视为不支持多线程,将所有与选择相关的操作都在同一个线程上执行。

NIO selector的并发模型是糟糕的,我必须这样说。因为对于每个试图学习它的人来说,这是一种巨大的时间浪费。最终的结论是,忘记它吧,它不适用于并发使用。


3
我其实很惊讶空块的锁定没有在编译时被移除。很酷它能够工作。我的意思是,它能够工作,它是抢占式的,虽然不是最美观的方法,但它能够工作。它比睡眠更好,因为它是可预测的,并且由于你使用唤醒调用,你知道会按需进行进展,而不是在纯粹依赖于选择超时的周期性更新上。
这种方法的主要缺点是你正在说注册调用胜过任何其他事情,即使服务请求。在你的系统中可能是正确的,但通常情况下并非如此,我认为这是一个可能存在的问题。一个较小的问题是,你在 SelectorThread 本身上锁定了它,这在这种情况下是一个较大的对象。不错,但随着扩展,这个锁定将需要记录,并考虑到其他客户端使用该类时。个人而言,我会选择另外一个锁定,以避免任何未预料到的未来危险。
个人而言,我喜欢排队技术。这种方式可以为你的线程分配角色,例如主线程和工作线程。所有类型的控制都在主线程上进行,例如在每次选择检查队列中是否有更多的注册信息后,清除并分配任何读取任务,在整个连接设置中处理任何更改(断开连接等)...“bs”并发模型似乎非常接受这种模型,它是一个相当标准的模型。我认为这不是一件坏事,因为它使代码变得更加简洁,更易于测试和阅读。只需要花费更多的时间编写。

虽然我承认,我已经很久没有写过这些东西了,但还有其他库可以帮助你进行排队。

Grizzly Nio Framework 虽然有点老,但上次我使用它时,主运行循环还不错。它为你设置了很多排队工作。

Apache Mina 类似于提供了一个排队框架。

但是最终取决于你正在处理什么。

  • 这是一个单人项目,只是为了玩弄框架吗?
  • 这是一段生产代码,你希望它能够长期存在吗?
  • 这是一段正在迭代的生产代码吗?

除非您计划将其用作向客户提供服务的核心组件,否则我认为您的方法是可行的。但它在长期运行中可能会有维护问题。


非常好的输入,谢谢。我认为注册不会胜过服务,因为唤醒只影响等待选择器,而不是循环的其余部分。至于锁定,使用一个简单的私有对象进行锁定是否更好?(一个仅用于锁定寄存器调用的对象)我不希望同时注册大量通道,所以我认为注册队列有点过度了。但我喜欢这个想法,可能会在将来实现它。 - riha
关于选择和服务,是的,我就是这个意思。比如你会花费一个空周期来处理刚刚发生的注册。虽然不是很糟糕,但需要额外的锁并立即处理。在排队系统中,通常有非阻塞队列,这种队列可以减轻对这些锁的需求。 - Greg Giacovelli
关于锁定,我会创建一个内部锁,专门用于处理选择器。但是听起来你已经准备好了。 - Greg Giacovelli

2

register()之前,您只需要一个wakeup(),并且在选择循环中,如果'ready'为零,则短暂休眠后继续执行,以给register()一个运行的机会。没有额外的同步:已经够糟糕了;不要让它变得更糟。我不喜欢这些需要注册、取消、更改兴趣操作等事物的队列:它们只是将可以真正并行完成的事情顺序化了。


1
@riha 嗯?我并没有说注册通道只能在 select() 线程中完成。可以通过我在回答你关于这个问题的描述中提到的技术,在单独的线程中完成注册。这很清楚,不是吗?至于 sleep,100 毫秒应该足够了。 - user207421
1
@riha 当你在select()中时,锁只存在于那里。因此,你需要唤醒选择器以使其退出select();如果select()返回一个正值,则循环有一些工作要做,在此期间你可以执行register();否则它返回零,因此你需要进行短暂的sleep(),在此期间你可以执行register()。 - user207421
1
现在我明白了你的意思,感谢你的解释。但是我认为同步块几乎与短暂的休眠具有相同的效果。如果没有工作要做,它甚至可能比从睡眠返回得更早。在这种情况下,睡眠感觉像是伪装的同步。 - riha
1
@riha 即使它是用来“延迟”同步,这对我来说也没有任何意义。 - user207421
-1 这个解决方案不可靠,因此是错误的,正如这里这里所解释的那样。正确答案是 https://dev59.com/pHNA5IYBdhLWcg3wL6yx#2179612 - Flow
显示剩余8条评论

0
一种可能的方法是将通道注册(或其他需要在NIO循环内完成的外部任务)注入选择循环中,示例如下。
//private final Set<ExternalEvent> externalTaskEvents = ConcurrentHashMap.newKeySet();
//...

while (!Thread.currentThread().isInterrupted()) {
    try {
        selector.select();
    } catch (IOException ex) {
        ex.printStackTrace(Log.logWriter);
        return;
    }

    //handle external task events
    Iterator<ExternalEvent> eitr = externalTaskEvents.iterator();
    while (eitr.hasNext()) {
        ExternalEvent event = eitr.next();
        eitr.remove();
        if(event.task != null){
            event.task.accept(event);
        }
    }

    //handle NIO network events
    Iterator<SelectionKey> nitr = selector.selectedKeys().iterator();
    while (nitr.hasNext()) {
        SelectionKey key = nitr.next();
        nitr.remove();
        if (!key.isValid()) {
            continue;
        }
        try {
            if (key.isAcceptable()) {
                onAcceptable(key);
            } else if (key.isConnectable()) {
                onConnectable(key);
            } else {
                if (key.isReadable()) {
                    onReadable(key);
                }
                if (key.isWritable()) {
                    onWritable(key);
                }
            }
        } catch (IOException | InterruptedException | CancelledKeyException ex) {
            ex.printStackTrace(Log.logWriter);
            //...
        }
    }
}

网页内容由stack overflow 提供, 点击上面的
可以查看英文原文,
原文链接