在Java中实现非阻塞套接字的最佳方法是什么?
或者有这样的东西吗?我有一个通过套接字与服务器通信的程序,但如果数据/连接存在问题,我不希望套接字调用被阻止/导致延迟。
在Java中实现非阻塞套接字的最佳方法是什么?
或者有这样的东西吗?我有一个通过套接字与服务器通信的程序,但如果数据/连接存在问题,我不希望套接字调用被阻止/导致延迟。
final String HOSTNAME = "127.0.0.1";
final int PORT = 8511;
// This is how you open a ServerSocketChannel
serverChannel = ServerSocketChannel.open();
// You MUST configure as non-blocking or else you cannot register the serverChannel to the Selector.
serverChannel.configureBlocking(false);
// bind to the address that you will use to Serve.
serverChannel.socket().bind(new InetSocketAddress(HOSTNAME, PORT));
// This is how you open a Selector
selector = Selector.open();
/*
* Here you are registering the serverSocketChannel to accept connection, thus the OP_ACCEPT.
* This means that you just told your selector that this channel will be used to accept connections.
* We can change this operation later to read/write, more on this later.
*/
serverChannel.register(selector, SelectionKey.OP_ACCEPT);
ServerSocketChannel.open()
方法创建一个SocketChannel
实例。接下来,调用configureBlocking(false)
将该channel
设置为非阻塞模式。通过serverChannel.socket().bind()
方法与服务器建立连接。其中,HOSTNAME
表示服务器的IP地址,PORT
表示通信端口。最后,调用Selector.open()
方法创建一个selector
实例,并将其注册到channel
和注册类型上。在本例中,注册类型为OP_ACCEPT
,表示选择器仅报告客户端尝试连接到服务器。其他可能的选项有:OP_CONNECT
(用于客户端)、OP_READ
和OP_WRITE
。// Run the server as long as the thread is not interrupted.
while (!Thread.currentThread().isInterrupted()) {
/*
* selector.select(TIMEOUT) is waiting for an OPERATION to be ready and is a blocking call.
* For example, if a client connects right this second, then it will break from the select()
* call and run the code below it. The TIMEOUT is not needed, but its just so it doesn't
* block undefinable.
*/
selector.select(TIMEOUT);
/*
* If we are here, it is because an operation happened (or the TIMEOUT expired).
* We need to get the SelectionKeys from the selector to see what operations are available.
* We use an iterator for this.
*/
Iterator<SelectionKey> keys = selector.selectedKeys().iterator();
while (keys.hasNext()) {
SelectionKey key = keys.next();
// remove the key so that we don't process this OPERATION again.
keys.remove();
// key could be invalid if for example, the client closed the connection.
if (!key.isValid()) {
continue;
}
/*
* In the server, we start by listening to the OP_ACCEPT when we register with the Selector.
* If the key from the keyset is Acceptable, then we must get ready to accept the client
* connection and do something with it. Go read the comments in the accept method.
*/
if (key.isAcceptable()) {
System.out.println("Accepting connection");
accept(key);
}
/*
* If you already read the comments in the accept() method, then you know we changed
* the OPERATION to OP_WRITE. This means that one of these keys in the iterator will return
* a channel that is writable (key.isWritable()). The write() method will explain further.
*/
if (key.isWritable()) {
System.out.println("Writing...");
write(key);
}
/*
* If you already read the comments in the write method then you understand that we registered
* the OPERATION OP_READ. That means that on the next Selector.select(), there is probably a key
* that is ready to read (key.isReadable()). The read() method will explain further.
*/
if (key.isReadable()) {
System.out.println("Reading connection");
read(key);
}
}
}
你可以在这里找到实现源代码
作为非阻塞实现的替代方案,我们可以部署一个异步服务器。例如,你可以使用AsynchronousServerSocketChannel
类,它提供了一个用于流式监听套接字的异步通道。
要使用它,首先执行它的静态open()
方法,然后将其bind()
到特定的端口。接下来,你将执行它的accept()
方法,将实现CompletionHandler
接口的类传递给它。通常情况下,你会发现这个处理程序被创建为一个匿名内部类。
AsynchronousServerSocketChannel
对象中,您调用accept()
来告诉它开始监听连接,传递给它一个自定义的CompletionHandler
实例。当我们调用accept()
时,它会立即返回。请注意,这与传统的阻塞方法不同;而accept()
方法会阻塞直到有客户端连接,AsynchronousServerSocketChannel
的accept()
方法会为您处理这个过程。public class NioSocketServer
{
public NioSocketServer()
{
try {
// Create an AsynchronousServerSocketChannel that will listen on port 5000
final AsynchronousServerSocketChannel listener = AsynchronousServerSocketChannel
.open()
.bind(new InetSocketAddress(5000));
// Listen for a new request
listener.accept(null, new CompletionHandler<AsynchronousSocketChannel, Void>()
{
@Override
public void completed(AsynchronousSocketChannel ch, Void att)
{
// Accept the next connection
listener.accept(null, this);
// Greet the client
ch.write(ByteBuffer.wrap("Hello, I am Echo Server 2020, let's have an engaging conversation!\n".getBytes()));
// Allocate a byte buffer (4K) to read from the client
ByteBuffer byteBuffer = ByteBuffer.allocate(4096);
try {
// Read the first line
int bytesRead = ch.read(byteBuffer).get(20, TimeUnit.SECONDS);
boolean running = true;
while (bytesRead != -1 && running) {
System.out.println("bytes read: " + bytesRead);
// Make sure that we have data to read
if (byteBuffer.position() > 2) {
// Make the buffer ready to read
byteBuffer.flip();
// Convert the buffer into a line
byte[] lineBytes = new byte[bytesRead];
byteBuffer.get(lineBytes, 0, bytesRead);
String line = new String(lineBytes);
// Debug
System.out.println("Message: " + line);
// Echo back to the caller
ch.write(ByteBuffer.wrap(line.getBytes()));
// Make the buffer ready to write
byteBuffer.clear();
// Read the next line
bytesRead = ch.read(byteBuffer).get(20, TimeUnit.SECONDS);
} else {
// An empty line signifies the end of the conversation in our protocol
running = false;
}
}
} catch (InterruptedException e) {
e.printStackTrace();
} catch (ExecutionException e) {
e.printStackTrace();
} catch (TimeoutException e) {
// The user exceeded the 20 second timeout, so close the connection
ch.write(ByteBuffer.wrap("Good Bye\n".getBytes()));
System.out.println("Connection timed out, closing connection");
}
System.out.println("End of conversation");
try {
// Close the connection if we need to
if (ch.isOpen()) {
ch.close();
}
} catch (I/OException e1)
{
e1.printStackTrace();
}
}
@Override
public void failed(Throwable exc, Void att)
{
///...
}
});
} catch (I/OException e) {
e.printStackTrace();
}
}
public static void main(String[] args)
{
NioSocketServer server = new NioSocketServer();
try {
Thread.sleep(60000);
} catch (Exception e) {
e.printStackTrace();
}
}
}
SocketChannel.configureBlocking(false)
。我刚刚写了这段代码。它运行良好。这是Java NIO的一个示例,正如上面的答案中提到的那样,但在这里我发布了代码。
ServerSocketChannel ssc = null;
try {
ssc = ServerSocketChannel.open();
ssc.socket().bind(new InetSocketAddress(port));
ssc.configureBlocking(false);
while (true) {
SocketChannel sc = ssc.accept();
if (sc == null) {
// No connections came .
} else {
// You got a connection. Do something
}
}
} catch (IOException e) {
e.printStackTrace();
}
Selector
)。 - user207421java.nio 包提供了类似于 C 语言的 Selector。
SelectionKeys
是由SelectableChannel.register()
创建的。"通常可接受的键在服务器端被创建" 应该改为 "总是"。"首先,我们使用 ServerSocketChannel.open() 方法创建 SocketChannel 的实例" 是不正确的。bind()
不会创建连接。Selector.open()
不执行注册。监听套接字不是 "面向流" 的。异步代码忽略了write()
调用返回的Future
。 - user207421System.out.println("New connection added to " + Thread.currentThread().getName());
进行测试。 - Amir Fo