非阻塞套接字

20

在Java中实现非阻塞套接字的最佳方法是什么?

或者有这样的东西吗?我有一个通过套接字与服务器通信的程序,但如果数据/连接存在问题,我不希望套接字调用被阻止/导致延迟。

5个回答

34
Java非阻塞套接字是在Java 2标准版1.4中引入的。它允许应用程序之间使用套接字进行网络通信而不会阻塞进程。但是,Teo,什么是非阻塞套接字?在哪些情况下它可以有用?它是如何工作的?好吧,年轻的帕达万,让我们回答这些问题。
非阻塞套接字允许在通道上进行I/O操作而不会阻塞使用它的进程。这意味着我们可以使用单个线程来处理多个并发连接,并获得"异步高性能"的读/写操作(有些人可能不同意这一点)。
好的,在哪些情况下它可以有用?
假设您想要实现一个接受各种客户端连接的服务器。假设您还希望服务器能够同时处理多个请求。使用传统的方式,您有两种选择来开发这样的服务器:
a. 实现一个多线程服务器,手动处理每个连接的线程。 b. 使用外部的第三方模块。
两种解决方案都可以,但是采用第一种方案,你必须开发整个线程管理解决方案,涉及并发和冲突问题。第二种方案使应用程序依赖于非JDK的外部模块,可能需要根据你的需求来适应该库。通过非阻塞套接字,你可以实现一个非阻塞服务器,而无需直接管理线程或使用外部模块。

它是如何工作的?

在进入细节之前,有几个术语你需要了解:
  • 在基于NIO的实现中,我们不是将数据写入输出流和从输入流读取数据,而是从缓冲区中读取和写入数据。缓冲区可以被定义为临时存储。
  • 通道将大量数据传输到缓冲区中,并从缓冲区中传输数据。同时,它也可以被视为通信的端点。
  • 就绪选择是一个概念,指的是“在读取或写入数据时选择不会阻塞的套接字的能力。”
Java NIO有一个名为Selector的类,它允许单个线程检查多个通道上的I/O事件。这是如何实现的呢?嗯,选择器可以检查通道的“准备就绪”状态,以便处理事件,例如客户端尝试连接或读写操作。换句话说,每个Selector实例可以监视更多的套接字通道,从而处理更多的连接。当通道上发生某些事件时,选择器会通知应用程序处理请求。选择器通过创建事件键(或选择键)来实现这一点,这些键是SelectionKey类的实例。每个键都包含有关“谁发出请求”和“请求类型是什么”的信息,如图1所示。
图1:结构图
一个基本的实现
一个服务器实现由一个无限循环组成,在循环中,选择器(selector)等待事件并创建事件键。事件键有四种可能的类型:
- 可接受(Acceptable):关联的客户端请求连接。 - 可连接(Connectable):服务器接受了连接。 - 可读(Readable):服务器可以读取。 - 可写(Writable):服务器可以写入。
通常,可接受的键是在服务器端创建的。实际上,这种类型的键只是通知服务器客户端需要连接,然后服务器确定套接字通道并将其与选择器关联以进行读/写操作。之后,当接受的客户端读取或写入数据时,选择器将为该客户端创建可读或可写的键。
现在,您可以按照提议的算法使用Java编写服务器。可以按照以下方式创建套接字通道、选择器和套接字选择器注册:
final String HOSTNAME = "127.0.0.1";
final int PORT = 8511;

// This is how you open a ServerSocketChannel
serverChannel = ServerSocketChannel.open();
// You MUST configure as non-blocking or else you cannot register the serverChannel to the Selector.
serverChannel.configureBlocking(false);
// bind to the address that you will use to Serve.
serverChannel.socket().bind(new InetSocketAddress(HOSTNAME, PORT));

// This is how you open a Selector
selector = Selector.open();
/*
* Here you are registering the serverSocketChannel to accept connection, thus the OP_ACCEPT.
* This means that you just told your selector that this channel will be used to accept connections.
* We can change this operation later to read/write, more on this later.
*/
serverChannel.register(selector, SelectionKey.OP_ACCEPT);

首先,我们使用ServerSocketChannel.open()方法创建一个SocketChannel实例。接下来,调用configureBlocking(false)将该channel设置为非阻塞模式。通过serverChannel.socket().bind()方法与服务器建立连接。其中,HOSTNAME表示服务器的IP地址,PORT表示通信端口。最后,调用Selector.open()方法创建一个selector实例,并将其注册到channel和注册类型上。在本例中,注册类型为OP_ACCEPT,表示选择器仅报告客户端尝试连接到服务器。其他可能的选项有:OP_CONNECT(用于客户端)、OP_READOP_WRITE
现在,我们需要使用一个无限循环来处理这些请求。一个简单的方法如下:
// Run the server as long as the thread is not interrupted.
while (!Thread.currentThread().isInterrupted()) {
    /*
     * selector.select(TIMEOUT) is waiting for an OPERATION to be ready and is a blocking call.
     * For example, if a client connects right this second, then it will break from the select()
     * call and run the code below it. The TIMEOUT is not needed, but its just so it doesn't
     * block undefinable.
     */
    selector.select(TIMEOUT);

    /*
     * If we are here, it is because an operation happened (or the TIMEOUT expired).
     * We need to get the SelectionKeys from the selector to see what operations are available.
     * We use an iterator for this.
     */
    Iterator<SelectionKey> keys = selector.selectedKeys().iterator();

    while (keys.hasNext()) {
        SelectionKey key = keys.next();
        // remove the key so that we don't process this OPERATION again.
        keys.remove();

        // key could be invalid if for example, the client closed the connection.
        if (!key.isValid()) {
            continue;
        }
        /*
         * In the server, we start by listening to the OP_ACCEPT when we register with the Selector.
         * If the key from the keyset is Acceptable, then we must get ready to accept the client
         * connection and do something with it. Go read the comments in the accept method.
         */
        if (key.isAcceptable()) {
            System.out.println("Accepting connection");
            accept(key);
        }
        /*
         * If you already read the comments in the accept() method, then you know we changed
         * the OPERATION to OP_WRITE. This means that one of these keys in the iterator will return
         * a channel that is writable (key.isWritable()). The write() method will explain further.
         */
        if (key.isWritable()) {
            System.out.println("Writing...");
            write(key);
        }
        /*
         * If you already read the comments in the write method then you understand that we registered
         * the OPERATION OP_READ. That means that on the next Selector.select(), there is probably a key
         * that is ready to read (key.isReadable()). The read() method will explain further.
         */
        if (key.isReadable()) {
            System.out.println("Reading connection");
            read(key);
        }
    }
}

你可以在这里找到实现源代码

注意:异步服务器

作为非阻塞实现的替代方案,我们可以部署一个异步服务器。例如,你可以使用AsynchronousServerSocketChannel类,它提供了一个用于流式监听套接字的异步通道。

要使用它,首先执行它的静态open()方法,然后将其bind()到特定的端口。接下来,你将执行它的accept()方法,将实现CompletionHandler接口的类传递给它。通常情况下,你会发现这个处理程序被创建为一个匿名内部类

从这个AsynchronousServerSocketChannel对象中,您调用accept()来告诉它开始监听连接,传递给它一个自定义的CompletionHandler实例。当我们调用accept()时,它会立即返回。请注意,这与传统的阻塞方法不同;而accept()方法会阻塞直到有客户端连接AsynchronousServerSocketChannelaccept()方法会为您处理这个过程。
这里有一个示例:
public class NioSocketServer
{
    public NioSocketServer()
    {
        try {
            // Create an AsynchronousServerSocketChannel that will listen on port 5000
            final AsynchronousServerSocketChannel listener = AsynchronousServerSocketChannel
                    .open()
                    .bind(new InetSocketAddress(5000));

            // Listen for a new request
            listener.accept(null, new CompletionHandler<AsynchronousSocketChannel, Void>()
            {
                @Override
                public void completed(AsynchronousSocketChannel ch, Void att)
                {
                    // Accept the next connection
                    listener.accept(null, this);

                    // Greet the client
                    ch.write(ByteBuffer.wrap("Hello, I am Echo Server 2020, let's have an engaging conversation!\n".getBytes()));

                    // Allocate a byte buffer (4K) to read from the client
                    ByteBuffer byteBuffer = ByteBuffer.allocate(4096);
                    try {
                        // Read the first line
                        int bytesRead = ch.read(byteBuffer).get(20, TimeUnit.SECONDS);

                        boolean running = true;
                        while (bytesRead != -1 && running) {
                            System.out.println("bytes read: " + bytesRead);

                            // Make sure that we have data to read
                            if (byteBuffer.position() > 2) {
                                // Make the buffer ready to read
                                byteBuffer.flip();

                                // Convert the buffer into a line
                                byte[] lineBytes = new byte[bytesRead];
                                byteBuffer.get(lineBytes, 0, bytesRead);
                                String line = new String(lineBytes);

                                // Debug
                                System.out.println("Message: " + line);

                                // Echo back to the caller
                                ch.write(ByteBuffer.wrap(line.getBytes()));

                                // Make the buffer ready to write
                                byteBuffer.clear();

                                // Read the next line
                                bytesRead = ch.read(byteBuffer).get(20, TimeUnit.SECONDS);
                            } else {
                                // An empty line signifies the end of the conversation in our protocol
                                running = false;
                            }
                        }
                    } catch (InterruptedException e) {
                        e.printStackTrace();
                    } catch (ExecutionException e) {
                        e.printStackTrace();
                    } catch (TimeoutException e) {
                        // The user exceeded the 20 second timeout, so close the connection
                        ch.write(ByteBuffer.wrap("Good Bye\n".getBytes()));
                        System.out.println("Connection timed out, closing connection");
                    }

                    System.out.println("End of conversation");
                    try {
                        // Close the connection if we need to
                        if (ch.isOpen()) {
                            ch.close();
                        }
                    } catch (I/OException e1)
                    {
                        e1.printStackTrace();
                    }
                }

                @Override
                public void failed(Throwable exc, Void att)
                {
                    ///...
                }
            });
        } catch (I/OException e) {
            e.printStackTrace();
        }
    }

    public static void main(String[] args)
    {
        NioSocketServer server = new NioSocketServer();
        try {
            Thread.sleep(60000);
        } catch (Exception e) {
            e.printStackTrace();
        }
    }
}

你可以在这里找到完整的代码。

1
这里有很多错误和混淆。你一直在说选择器创建事件键,但它并不是这样。它只是更新它们。SelectionKeys 是由 SelectableChannel.register() 创建的。"通常可接受的键在服务器端被创建" 应该改为 "总是"。"首先,我们使用 ServerSocketChannel.open() 方法创建 SocketChannel 的实例" 是不正确的。bind() 不会创建连接。Selector.open() 不执行注册。监听套接字不是 "面向流" 的。异步代码忽略了 write() 调用返回的 Future - user207421
第二个方法为每个传入的连接创建一个线程。通过在完成的方法中添加 System.out.println("New connection added to " + Thread.currentThread().getName()); 进行测试。 - Amir Fo
@Teocci 如何使用异步处理程序来实现对Modbus的调用?就像这里一样:Modbus脉冲线圈 - catch23
为什么你不使用Java的Modbus实现呢?它已经实现了一个高性能、非阻塞、零缓冲复制库。 - Teocci
这个答案应该是一篇独立的文章。 - Daniil
显示剩余2条评论

9
什么是在Java中实现非阻塞套接字的最佳方法?
只有一种方法。使用SocketChannel.configureBlocking(false)
请注意,这些答案中有几个是不正确的。 SocketChannel.configureBlocking(false)将其置于非阻塞模式。您不需要选择器来执行此操作。您只需要一个选择器来实现超时或使用非阻塞套接字的多路I/O。

4
除了使用非阻塞IO,您可能会发现为连接设置一个写入线程更加简单。
注意:如果您只需要几千个连接,则每个连接使用一到两个线程更简单。如果您的服务器每个有大约一万个或更多的连接,则需要NIO和选择器。

1
更容易,但不可扩展。线程太多会消耗程序占用的资源,导致电脑不再喜欢你。但这仍然取决于您个人的需求。 - Jasper Lankhorst
@JasperLankhorst,确实,您可以使用每个连接的1/2线程,但连接数量有限,仅限于几千个。使用选择器,您可以拥有10倍或更多的连接数。 - Peter Lawrey

-1

我刚刚写了这段代码。它运行良好。这是Java NIO的一个示例,正如上面的答案中提到的那样,但在这里我发布了代码。

ServerSocketChannel ssc = null;
try {
    ssc = ServerSocketChannel.open();
    ssc.socket().bind(new InetSocketAddress(port));
    ssc.configureBlocking(false);
    while (true) {
        SocketChannel sc = ssc.accept();
        if (sc == null) {
            // No connections came .
        } else {
            // You got a connection. Do something
        }
    }
} catch (IOException e) {
    e.printStackTrace();
}

6
无法使用。这段代码将在没有连接时执行自旋循环。一个理智的版本应该使用阻塞模式或者选择器(Selector)。 - user207421
1
我认为这个解决方案与阻塞方法非常相似,使用非阻塞方法的想法是创建一个处理程序来处理每当发生该事件时的接受。当您使用此while(true)获取套接字时,就像您阻塞线程直到客户端连接发生一样。 - Teocci
1
通过拥有一个阻塞服务器套接字通道,这将得到改善,同时您仍然可以拥有非阻塞连接。 - Peter Lawrey
1
@Teocci 这就像是在使用 CPU 时吸烟,而不是使用阻塞操作。这确实是非常糟糕的做法。 - user207421

-1

java.nio 包提供了类似于 C 语言的 Selector


网页内容由stack overflow 提供, 点击上面的
可以查看英文原文,
原文链接