使用Java NIO实现非阻塞服务器

4
我正在使用这个教程来构建一个无需可写部分的Java NIO服务器。

一切都很好,除了一个有趣的问题:

  • 当客户端发送数据包过快时,服务器并不能接收到所有的消息。服务器总是只接收到第一个和第二个数据包,而不会再多。
  • 如果客户端发送数据包得比较慢,则服务器可以接收到所有的包。

有什么想法吗?

我添加了服务器类代码,如果您需要代码中提及的其他类,我在这里:)。

NIOServer类:

package server;
import java.io.IOException;
import java.net.InetAddress;
import java.net.InetSocketAddress;
import java.net.Socket;
import java.nio.ByteBuffer;
import java.nio.channels.SelectionKey;
import java.nio.channels.Selector;
import java.nio.channels.ServerSocketChannel;
import java.nio.channels.SocketChannel;
import java.nio.channels.spi.SelectorProvider;
import java.util.*;

import javax.xml.parsers.ParserConfigurationException;

import org.xml.sax.SAXException;

public class NioServer implements Runnable {



// The host:port combination to listen on
  private InetAddress hostAddress;
  private int port;

  // The channel on which we'll accept connections
  private ServerSocketChannel serverChannel;

  // The selector we'll be monitoring
  private Selector selector;

  //the cach will hundle the messages that came
  private Cache cache;

  // The buffer into which we'll read data when it's available
  private ByteBuffer readBuffer = ByteBuffer.allocate(8192);

  public NioServer(InetAddress hostAddress, int port , Cache cache) throws IOException {
    this.cache = cache;
    this.hostAddress = hostAddress;
    this.port = port;
    this.selector = this.initSelector();
  }


  private Selector initSelector() throws IOException {
        // Create a new selector
        Selector socketSelector = SelectorProvider.provider().openSelector();

        // Create a new non-blocking server socket channel
        this.serverChannel = ServerSocketChannel.open();
        serverChannel.configureBlocking(false);

        // Bind the server socket to the specified address and port
        InetSocketAddress isa = new InetSocketAddress(this.hostAddress, this.port);
        serverChannel.socket().bind(isa);

        // Register the server socket channel, indicating an interest in 
        // accepting new connections
        serverChannel.register(socketSelector, SelectionKey.OP_ACCEPT);

        return socketSelector;
      }

  private void accept(SelectionKey key) throws IOException {
        // For an accept to be pending the channel must be a server socket channel.
        ServerSocketChannel serverSocketChannel = (ServerSocketChannel) key.channel();

        // Accept the connection and make it non-blocking
        SocketChannel socketChannel = serverSocketChannel.accept();
        Socket socket = socketChannel.socket();
        socketChannel.configureBlocking(false);

        // Register the new SocketChannel with our Selector, indicating
        // we'd like to be notified when there's data waiting to be read
        socketChannel.register(this.selector, SelectionKey.OP_READ);
      }

  private void read(SelectionKey key) throws IOException {
        SocketChannel socketChannel = (SocketChannel) key.channel();

        // Clear out our read buffer so it's ready for new data
        this.readBuffer.clear();

        // Attempt to read off the channel
        int numRead;
        try {
          numRead = socketChannel.read(this.readBuffer);
          String test = new String(this.readBuffer.array());
          System.out.print(test);

        } catch (IOException e) {
          // The remote forcibly closed the connection, cancel
          // the selection key and close the channel.
        //  key.cancel();
        //  socketChannel.close();
          return;
        }

        if (numRead == -1) {
          // Remote entity shut the socket down cleanly. Do the
          // same from our end and cancel the channel.
          key.channel().close();
          key.cancel();
          return;
        }

        // Hand the data off to our worker thread
        this.cache.processData(this, socketChannel, this.readBuffer.array(), numRead); 
      }

  public void run() {
        while (true) {
          try {
            // Wait for an event one of the registered channels

            this.selector.select();



            // Iterate over the set of keys for which events are available
            Iterator selectedKeys = this.selector.selectedKeys().iterator();
            while (selectedKeys.hasNext()) {
              SelectionKey key = (SelectionKey) selectedKeys.next();
              selectedKeys.remove();

              if (!key.isValid()) {
                continue;
              }

              // Check what event is available and deal with it
              if (key.isAcceptable()) {
                this.accept(key);
              } else if (key.isReadable()) {
                this.read(key);
              }
            }
          } catch (Exception e) {
            e.printStackTrace();
          }
        }
      }

  public static void main(String[] args) throws ParserConfigurationException, SAXException {
    try {
        Cache cache = new Cache();
        new Thread(cache).start();
      new Thread(new NioServer(null, 9090,cache)).start();
    } catch (IOException e) {
      e.printStackTrace();
    }
  }

4
你的代码肯定出现了问题。如果你需要更多帮助,请提供更多信息。 - Aaron Digulla
4
TCP不会丢失数据,NIO也是如此。要么您没有读取所有的数据,要么您将其中一些数据丢弃了。如果没有可供评论的代码,就无法进一步发表评论。 - user207421
  1. 如果在读取时捕获到 IOException 异常,必须(a)记录日志并(b)关闭通道。
  2. 在关闭通道后,无需取消密钥。
- user207421
注意:1. 这是一个非阻塞服务器,而不是异步服务器。2. ROX NIO 教程在此问题发布前多年就已被证明是不可信的。我已经在 Sun/Oracle Java 论坛上对此进行了广泛的撰写。它包含许多事实错误和很多糟糕的编程。 - user207421
这个问题应该被关闭,因为在八年中OP没有提供任何发送代码和证据,而且我的正确答案应该被恢复。 - user207421
@MarquisofLorne 你建议学习哪个教程来构建Java服务器? - NomadMaker
1个回答

0

如果你正在读取UDP数据包,我会期望你在read方法上处理数据的速度非常慢。你正在使用system.out输出数据,这是非常慢的,而且不确定你在processData方法中处理数据的速度有多快。如果这是导致延迟的原因,那么我编写的这个库可以帮助你进行线程间的非阻塞通信。你还应该检查底层读取套接字缓冲区的大小。缓冲区越大,你就有更多的空间来快速处理数据,避免数据包开始被丢弃。对于TCP,如果底层套接字缓冲区已满,你可能会在通道上收到一个IOException异常。对于UDP,数据包将会被静默地丢弃。

要访问底层读取套接字缓冲区的大小,你可以执行以下操作:

final Socket socket = channel.socket();
System.out.println(socket.getReceiveBufferSize());
socket.setReceiveBufferSize(newSize);

注意:据我所知,Linux可能需要一些操作系统配置才能更改底层缓冲区大小。如果setReceiveBufferSize没有效果(再次阅读以查看是否已更改),请搜索谷歌了解更多信息。 :)

这是一个TCP服务器。 - user207421

网页内容由stack overflow 提供, 点击上面的
可以查看英文原文,
原文链接