我该如何使用AsynchronousServerSocketChannel来接受连接?

22

我想使用Java 7和NIO 2编写一个异步服务器。

但是如何使用AsynchronousServerSocketChannel

比如,如果我从以下内容开始:

final AsynchronousServerSocketChannel server = 
    AsynchronousServerSocketChannel.open().bind(
        new InetSocketAddress(port));

当我执行server.accept()时,程序会异步终止。如果我将该代码放入一个无限循环中,将抛出AcceptPendingException异常。有没有关于如何使用AsynchronousServerSocketChannel编写简单的异步服务器的建议?以下是我的完整示例(类似于JavaDoc中的示例):
import java.io.IOException;
import java.net.InetSocketAddress;
import java.nio.channels.AsynchronousServerSocketChannel;
import java.nio.channels.AsynchronousSocketChannel;
import java.nio.channels.CompletionHandler;

public class AsyncServer {

    public static void main(String[] args) {
        int port = 8060;
        try {
            final AsynchronousServerSocketChannel server = 
                    AsynchronousServerSocketChannel.open().bind(
                            new InetSocketAddress(port));

            System.out.println("Server listening on " + port);

            server.accept("Client connection", 
                    new CompletionHandler<AsynchronousSocketChannel, Object>() {
                public void completed(AsynchronousSocketChannel ch, Object att) {
                    System.out.println("Accepted a connection");

                    // accept the next connection
                    server.accept("Client connection", this);

                    // handle this connection
                    //TODO handle(ch);
                }

                public void failed(Throwable exc, Object att) {
                    System.out.println("Failed to accept connection");
                }
            });
        } catch (IOException e) {
            e.printStackTrace();
        }
    }
}

你可以使用专门用于客户端-服务器应用程序的Netty框架。它还使用Java NIO。这是快速开发服务器的简单方法。请访问http://netty.io/。 - ajay_t
8
@Optimus: 我知道Netty,但它与这个问题无关。 - Jonas
4个回答

18

你做得很好,从已完成的回调中调用accept()以接受更多连接是正确的方法。

一个简单(但不太好看)的方法来防止线程终止就是简单地循环,直到线程被中断。

// yes, sleep() is evil, but sometimes I don't care
while (true) {
    Thread.sleep(1000);
}

更简洁的方法是使用 AsynchronousChannelGroup。例如:
AsynchronousChannelGroup group = AsynchronousChannelGroup.withThreadPool(Executors
            .newSingleThreadExecutor());
AsynchronousServerSocketChannel server = AsynchronousServerSocketChannel.open(group).bind(
            new InetSocketAddress(port));

// (insert server.accept() logic here)

// wait until group.shutdown()/shutdownNow(), or the thread is interrupted:
group.awaitTermination(Long.MAX_VALUE, TimeUnit.SECONDS);

你可以调整线程的处理方式,详细信息请参阅AsynchronousChannelGroup API文档

7
你应该关注这个问题。虽然sleep()有时会很方便,但它通常不是最优的解决方案。 - user238033
7
呵呵。我一直在因为sleep()调用而被踩票,即使我已经称它为“丑陋”和“邪恶”,并展示了一种更好的方法。这似乎相当教条主义。 :-) - Soulman

4

如果您在同一个线程中有其他事情要做,使用异步接受会很有用。但在您的情况下,您没有做其他的事情,所以我建议使用

while(true) {
    AsynchronousSocketChannel socket = server.accept().get();
    System.out.println("Accepted " + socket);
    socket.close();
}

@gnarly它确实可以,但是accept()每次返回一个不同的Future。 - Peter Lawrey
1
我的观点是你的例子是一个阻塞服务器,这背离了 AsynchronousServerSocketChannel 的初衷。 - user238033
1
在异步服务器中,您不应该使用多个线程,这就是它的美妙之处,操作系统会在内部处理所有内容,并在需要进行IO时通知您,因此您永远不应该有一个等待它的线程。使用 CompletionHandler,它会自动监听要接受的连接。 - user238033
除了Java使用ExecutorService来执行读写并返回Futures之外,它并不像这样使用操作系统来执行异步操作(除非是InfiniBand)。 ;) - Peter Lawrey
@gnarly 有多个实现方式。默认使用 ExecutorService,但我想 Infini-band 支持意味着它有自己的实现方式,但我没有使用过。 - Peter Lawrey
显示剩余2条评论

1
另一种选择是让主方法在返回前等待信号。如果有某种外部关闭命令,则只需通知该信号,主线程即可关闭。
private static final Object shutdownSignal = new Object();

public static void main(String[] args) {

    ...

    synchronized (shutdownSignal) {
        try {
            shutdownSignal.wait();
        }
        catch (InterruptedException e) {
            // handle it!
        }
    }
}

我不会使用“Object”作为标识符,也不会使用布尔值,但不是布尔类,因为它会导致奇怪的副作用(请阅读源代码了解原因)。 最佳解决方案: ~ 在私有final对象上同步,专门为此目的指定(如果其他人可能会扩展我们的类,则更整洁) ~ 使用get和set方法替换shutdownSignal,使用final AtomicBoolean进行更改而不是赋值(您仍需要同步以测试状态)来源:https://telliott.io/node/40(为什么不要在同步中使用布尔值) - Jasper Lankhorst
@JasperLankhorst,你的评论毫无意义。这正是wait/notify的使用情况。由于此处没有赋值操作,因此不可能出现源文章中描述的奇怪副作用。这只是一种信号机制。 - dOxxx

-2
请使用如下示例所示的倒计时锁(CountDownLatch)。
    final AsynchronousServerSocketChannel serverChannel = AsynchronousServerSocketChannel.open();
    InetSocketAddress address = new InetSocketAddress(port);
    serverChannel.bind(address);
    final CountDownLatch latch = new CountDownLatch(1);
    serverChannel.accept(null, new CompletionHandler<AsynchronousSocketChannel, Object>() {
@Override
        public void completed(final AsynchronousSocketChannel channel, Object attachment) {
            serverChannel.accept(null, this);
                        }

});
try {
        latch.await();
    } catch (InterruptedException e) {
        e.printStackTrace();
        Thread.currentThread().interrupt();
    }

网页内容由stack overflow 提供, 点击上面的
可以查看英文原文,
原文链接