如何在NIO服务器上保持每个通道的数据

4
我有一个Java NIO服务器,用于接收客户端的数据。
当通道准备好读取时,即key.isReadable()返回true时,将调用read(key)来读取数据。
目前,我正在为所有通道使用单个读取缓冲区,并在read()方法中清除缓冲区并读入数据,最后将其放入字节数组中,假设我会一次性获得所有数据。
但是,假设我没有一次性获取完整的数据(我在数据结尾处有特殊字符进行检测)。
问题:
现在如何将这些部分数据保留在通道中,或如何处理部分读取问题?还是全局的?
我在某个地方读到附件不好。

1
如果您意识到可能存在的内存泄漏问题,则附件并没有任何问题。 - John
1
你可能想要查看Netty库,它使NIO更易于使用:http://netty.io/ - Wim Deblauwe
值得检查一下这个库,它非常不错。Netty也可以嵌入到Tomcat中。 - John
2个回答

6
请看Reactor模式。以下是Doug Lea教授的基本实现链接:http://gee.cs.oswego.edu/dl/cpjslides/nio.pdf
这个模式的思想是有一个单独的反应器线程,该线程在选择器调用上阻塞。一旦IO事件就绪,反应器线程将事件分派到适当的处理程序。在上述pdf中,反应器内部有一个接受新连接的Acceptor内部类。
作者使用单个处理程序来处理读写事件并维护此处理程序的状态。我更喜欢为读取和写入分别使用不同的处理程序,但这不像“状态机”那样易于使用。每个事件只能有一个附件,因此需要某种注入来切换读/写处理程序。
为了在后续的读/写之间保持状态,您需要做几件事情:
-引入自定义协议以告诉您何时完全读取消息 -对于过期的连接,具有超时或清除机制 -维护特定于客户端的会话
因此,您可以执行以下操作:
public class Reactor implements Runnable{

    Selector selector = Selector.open();

    ServerSocketChannel serverSocketChannel = ServerSocketChannel.open();

    public Reactor(int port) throws IOException {

        serverSocketChannel.socket().bind(new InetSocketAddress(port));

        serverSocketChannel.configureBlocking(false);

        // let Reactor handle new connection events
        registerAcceptor();

    }

    /**
     * Registers Acceptor as handler for new client connections.
     * 
     * @throws ClosedChannelException
     */
    private void registerAcceptor() throws ClosedChannelException {


        SelectionKey selectionKey0 = serverSocketChannel.register(selector, SelectionKey.OP_ACCEPT);

        selectionKey0.attach(new Acceptor());
    }

    @Override
    public void run(){

        while(!Thread.interrupted()){

            startReactorLoop();

        }

    }

    private void startReactorLoop() {

        try {

            // wait for new events for each registered or new clients
            selector.select();

            // get selection keys for pending events
            Set<SelectionKey> selectedKeys = selector.selectedKeys();

            Iterator<SelectionKey> selectedKeysIterator = selectedKeys.iterator();

            while (selectedKeysIterator.hasNext()) {

                // dispatch even to handler for the given key
                dispatch(selectedKeysIterator.next());

                // remove dispatched key from the collection
                selectedKeysIterator.remove();
            }

        } catch (IOException e) {
            // TODO add handling of this exception
            e.printStackTrace();
        }
    }

    private void dispatch(SelectionKey interestedEvent) {

        if (interestedEvent.attachment() != null) {

            EventHandler handler = (EventHandler) interestedEvent.attachment();

            handler.processEvent();
        }

    }

    private class Acceptor implements EventHandler {

        @Override
        public void processEvent() {

            try {

                SocketChannel clientConnection = serverSocketChannel.accept();

                if (clientConnection != null) {

                    registerChannel(clientConnection);

                }

            } catch (IOException e) {e.printStackTrace();}

        }
    /**
     *  Save Channel - key association - in Map perhaps.
     * This is required for subsequent/partial reads/writes
     */
    private void registerChannel(SocketChannel clientChannel) {


        // notify injection mechanism of new connection (so it can activate Read Handler)
}

一旦读取事件被处理,通知注入机制可以注入写入处理程序。

当有新的连接可用时,注入机制会创建读取和写入处理程序的新实例。此注入机制根据需要切换处理程序。每个通道的处理程序查找是通过在连接接受时由方法 `registerChannel()` 填充的 Map 进行的。

读取和写入处理程序具有 ByteBuffer 实例,并且由于每个 Socket 通道都有自己的一对处理程序,因此现在可以在部分读取和写入之间维护状态。

提高性能的两个提示:

  • 在连接被接受时立即尝试进行第一次读取。仅当您的自定义协议中的头部定义的数据不足时,才为读取事件注册通道兴趣。

  • 尝试先写入而不注册写入事件的兴趣,只有当您没有写入所有数据时,才注册写入兴趣。

这将减少选择器唤醒次数。

类似于这样:

SocketChannel socketChannel;

byte[] outData;

final static int MAX_OUTPUT = 1024;

ByteBuffer output = ByteBuffer.allocate(MAX_OUTPUT);

// if message was not written fully
if (socketChannel.write(output) < messageSize()) {

// register interest for write event
SelectionKey selectionKey = socketChannel.register(selector, SelectionKey.OP_WRITE); 
        selectionKey.attach(writeHandler);
        selector.wakeup();

最后,应该设置定时任务来检查连接是否仍然存活/选择键是否被取消。如果客户端断开TCP连接,服务器通常不会知道这一点。因此,将有许多事件处理程序绑定为过期的连接附件,这将导致内存泄漏。

这就是为什么你可能会说附件不好,但是这个问题可以处理。

要解决这个问题,有两种简单的方法:

  • 启用TCP保持活动功能

  • 定期任务可以检查给定通道上最后一次活动的时间戳。如果它闲置太长时间,服务器应该终止连接。


我已经有一个不使用处理程序的实现,但是关于附件的想法仍然是一样的。感谢您的好解释。 - cruxion effux
其中一些是城市传说。如果客户端断开连接,服务器将收到一个OP_READ,然后读取,并得到一个返回值为-1。此时应关闭通道,这将取消密钥并从所有密钥集中删除它。当您拥有选择器本身并且可以在其处理循环结束时提供超时和清理活动时,就不需要单独的任务。在NIO中,线程越少越好。 - user207421
通过选择器线程进行清理的建议非常好。如果客户端失去连接并且无法发送FIN段,服务器是否仍会收到OP_READ? - John

1

有人在亚马逊的一篇古老而且非常不准确的NIO博客中错误地声称关键附件是内存泄漏,这完全是胡说八道,甚至不合逻辑。这也是他断言你需要各种补充队列的博客之一。在大约13年的NIO使用中从未遇到过这种情况。

你所需要的是每个通道一个ByteBuffer,或者可能是两个,一个用于读取,一个用于写入。你可以将单个缓冲区作为附件本身存储:如果您想要两个,或者有其他数据需要存储,则需要定义一个包含两个缓冲区和任何您想要与通道关联的其他数据(例如客户端凭据)的Session类,并使用Session对象作为附件。

你真的无法通过一个单一的缓冲区为所有通道提供很多帮助。


这个是不是你要找的?https://jfarcand.wordpress.com/2006/07/06/tricks-and-tips-with-nio-part-ii-why-selectionkey-attach-is-evil/ - Water
@Water:是的,这就是适合我的。 - cruxion effux
有个博客是亚马逊的某位写的,但我不确定它是否是古老的,因为我还没有看到谈论内存泄漏的部分。http://rox-xmlrpc.sourceforge.net/niotut/index.html - crakama

网页内容由stack overflow 提供, 点击上面的
可以查看英文原文,
原文链接