Java网络编程:事件驱动的Socket/InputStream

6
我正在实现一个基于Java Socket的事件驱动层,我想知道是否有一种方法可以确定是否存在待读取的数据。
我的正常做法是从socket中读取到缓冲区中,并在缓冲区填满一定量的字节后调用提供的回调函数(如果回调需要每次到达任何内容都触发,则可以为0),但我怀疑Java已经为我做了缓冲。
InputStream的available()方法是否可靠?我应该只是读取并在Socket上面自己进行缓冲吗?还是有其他方法?
2个回答

9
简而言之,不是的。至少对我来说,available() 不可靠。我建议使用与 SelectorSelectionKey 相连的 java.nio.channels.SocketChannel。这个解决方案有点基于事件,但比普通套接字复杂。
对于客户端:
  1. 构造套接字通道 (socket),打开选择器 (selector = Selector.open();)。
  2. 使用非阻塞式 socket.configureBlocking(false);
  3. 为连接注册选择器 socket.register(selector, SelectionKey.OP_CONNECT);
  4. 连接 socket.connect(new InetSocketAddress(host, port));
  5. 查看是否有新内容 selector.select();
  6. 如果“新”指的是成功连接,则为其注册选择器 OP_READ;如果“新”指的是可用数据,则直接从套接字中读取。
然而,为了使其异步化,您需要设置一个单独的线程(尽管套接字被创建为非阻塞式,该线程仍将阻塞),以检查是否有内容到达。
对于服务器,有 ServerSocketChannel 并使用 OP_ACCEPT
参考我的代码 (client),应该能给你一些提示:
 private Thread readingThread = new ListeningThread();

 /**
  * Listening thread - reads messages in a separate thread so the application does not get blocked.
  */
 private class ListeningThread extends Thread {
  public void run() {
   running = true;
   try {
    while(!close) listen();
    messenger.close();
   }
   catch(ConnectException ce) {
    doNotifyConnectionFailed(ce);
   }
   catch(Exception e) {
//    e.printStackTrace();
    messenger.close();
   }
   running = false;
  }
 }

 /**
  * Connects to host and port.
  * @param host Host to connect to.
  * @param port Port of the host machine to connect to.
  */
 public void connect(String host, int port) {
  try {
   SocketChannel socket = SocketChannel.open();
   socket.configureBlocking(false);
   socket.register(this.selector, SelectionKey.OP_CONNECT);
   socket.connect(new InetSocketAddress(host, port));
  }
  catch(IOException e) {
   this.doNotifyConnectionFailed(e);
  }
 }

 /**
  * Waits for an event to happen, processes it and then returns.
  * @throws IOException when something goes wrong.
  */
 protected void listen() throws IOException {
  // see if there are any new things going on
  this.selector.select();
  // process events
  Iterator<SelectionKey> iter = selector.selectedKeys().iterator();
  while(iter.hasNext()) {
   SelectionKey key = iter.next();
   iter.remove();
   // check validity
   if(key.isValid()) {
    // if connectable...
    if(key.isConnectable()) {
     // ...establish connection, make messenger, and notify everyone
     SocketChannel client = (SocketChannel)key.channel();
     // now this is tricky, registering for OP_READ earlier causes the selector not to wait for incoming bytes, which results in 100% cpu usage very, very fast
     if(client!=null && client.finishConnect()) {
      client.register(this.selector, SelectionKey.OP_READ);
     }
    }
    // if readable, tell messenger to read bytes
    else if(key.isReadable() && (SocketChannel)key.channel()==this.messenger.getSocket()) {
     // read message here
    }
   }
  }
 }

 /**
  * Starts the client.
  */
 public void start() {
  // start a reading thread
  if(!this.running) {
   this.readingThread = new ListeningThread();
   this.readingThread.start();
  }
 }

 /**
  * Tells the client to close at nearest possible moment.
  */
 public void close() {
  this.close = true;
 }

对于服务器:

 /**
  * Constructs a server.
  * @param port Port to listen to.
  * @param protocol Protocol of messages.
  * @throws IOException when something goes wrong.
  */
 public ChannelMessageServer(int port) throws IOException {
  this.server = ServerSocketChannel.open();
  this.server.configureBlocking(false);
  this.server.socket().bind(new InetSocketAddress(port));
  this.server.register(this.selector, SelectionKey.OP_ACCEPT);
 }

 /**
  * Waits for event, then exits.
  * @throws IOException when something goes wrong.
  */
 protected void listen() throws IOException {
  // see if there are any new things going on
  this.selector.select();
  // process events
  Iterator<SelectionKey> iter = selector.selectedKeys().iterator();
  while(iter.hasNext()) {
   SelectionKey key = iter.next();
   // do something with the connected socket
   iter.remove();
   if(key.isValid()) this.process(key);
  }
 }

 /**
  * Processes a selection key.
  * @param key SelectionKey.
  * @throws IOException when something is wrong.
  */
 protected void process(SelectionKey key) throws IOException {
  // if incoming connection
  if(key.isAcceptable()) {
   // get client
   SocketChannel client = (((ServerSocketChannel)key.channel()).accept());
    try {
     client.configureBlocking(false);
     client.register(this.selector, SelectionKey.OP_READ);
    }
    catch(Exception e) {
     // catch
    }
  }
  // if readable, tell messenger to read
  else if(key.isReadable()) {
  // read
  }
 }

希望这能帮助到您。

我不明白。您不需要一个单独的线程。非阻塞套接字根据定义不会阻塞。只需正确使用OP_READ和正确的读取循环,在读取返回零时停止即可。 - user207421
@EJP:不是不同意您的观点;但是我感觉,无论是否阻塞,从套接字读取数据仍然会被阻塞,即使没有可读取的内容。也许是我做错了什么。我建议提问者按照您的建议尝试一下,如果不行,再尝试使用线程。 - Miki
你几乎肯定是在 read() 返回零时进行了循环,这就是我提到它的原因。那不是阻塞,而是循环。 - user207421

0

available() 只会告诉你是否可以在不经过操作系统的情况下读取数据。在这里它并不是很有用。

你可以根据自己的喜好进行阻塞或非阻塞读取。非阻塞读取只会在没有数据可读时返回,这可能是你想要的。


2
不正确。available()会告诉您BufferedInputStream/BufferedReader中的数据总和(如果您正在使用其中一个),以及套接字接收缓冲区,这是内核数据结构。如果数据仅在套接字接收缓冲区中,则必须“转到操作系统”才能获取它,但在此过程中不会阻塞。正如Javadoc所说。但是,如果例如是SSLSocket,则available()始终返回零。 - user207421

网页内容由stack overflow 提供, 点击上面的
可以查看英文原文,
原文链接