Java: 线程池连接阻塞IO与NIO的完整代码示例?

3

好的,我已经疯了。我一直在为我的服务器重写NIO代码,并遇到了一些真正的头痛问题。底线是,正确使用NIO非常困难。有些人指向了Rox教程,网址为http://rox-xmlrpc.sourceforge.net/niotut/,这似乎是一个好的方向,但不够完整。例如,我需要知道如何在排队的传出ByteBuffer发送后仅在服务器端关闭连接。SocketChannel.close()是突然的,如果过早关闭可能会丢失数据。我还需要发送比读取ByteBuffer更大的大数据包。Rox代码(以及其他任何我看过的代码)都没有处理这个问题。还有许多地方似乎未正确处理未捕获的异常。在我的测试中存在一些错误,鉴于NIO的复杂性,不清楚如何正确处理它们。

无论如何,当我试图解决这些问题时,越来越多的棘手细节显现出来,变得非常复杂。因此,我正在考虑完全不同的方法。许多人说NIO非常容易出错,令人困惑和复杂。他们主张使用“每个连接一个线程”的模型,其中每个套接字连接都在自己的线程上运行,使用阻塞IO。这似乎是一个好主意,并且通过为所有连接(如NIO中)使用一个选择器线程来减少前端瓶颈,但代价是更高的开销(用于线程)。这种情绪在帖子中反映出来,例如http://paultyma.blogspot.com/2008/03/writing-java-multithreaded-servers.htmlhttp://mailinator.blogspot.com/2008/02/kill-myth-please-nio-is-not-faster-than.html

与NIO相比,代码应该更简单,但我真的想看一些示例代码。我似乎找不到任何东西。问题在于,我认为这个“每个连接一个线程的阻塞I/O”策略没有更好的名称,我实际上可以获得良好的Google结果。有人能链接我到一些教程或简单的示例,以解释使用这种“旧”的I/O方法并使用线程池扩展它吗?还是有其他的智慧之言?非常感谢!


SocketChannel.close() 不是“突然”的,也不会“丢失数据”,除非您的应用程序尚未发送该数据。离题了。 - user207421
3个回答

2
如果您正在使用NIO,我建议使用框架来帮助您。我一直在使用Apache Mina,并且我会推荐它。
对于阻塞IO,您需要一个监听器线程来接受传入的连接,并生成将处理每个连接的其他线程。以下是这样一个监听器代码的示例,最初由Apache Felix项目提供。如果您需要完整但修改后的版本,可在此浏览源代码
例如:
    /*
    * Licensed to the Apache Software Foundation (ASF) under one or more
    * contributor license agreements.  See the NOTICE file distributed with
    * this work for additional information regarding copyright ownership.
    * The ASF licenses this file to You under the Apache License, Version 2.0
    * (the "License"); you may not use this file except in compliance with
    * the License.  You may obtain a copy of the License at
    *
    *      http://www.apache.org/licenses/LICENSE-2.0
    *
        * Unless required by applicable law or agreed to in writing, software
    * distributed under the License is distributed on an "AS IS" BASIS,
    * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
    * See the License for the specific language governing permissions and
    * limitations under the License.
    */
    package org.apache.felix.shell.remote;
    
    
    import java.io.IOException;
    import java.io.PrintStream;
    import java.net.ServerSocket;
    import java.net.Socket;
    import java.net.SocketException;
    
    
    /**
     * Implements a simple listener that will accept a single connection.
     * <p/>
     *
     * @author Dieter Wimberger (wimpi)
     */
    class Listener
    {
    
        private int m_Port;
        private Thread m_ListenerThread;
        private boolean m_Stop = false;
        private ServerSocket m_ServerSocket;
        private AtomicInteger m_UseCounter;
        private int m_MaxConnections;
    
    
        /**
         * Activates this listener on a listener thread (telnetconsole.Listener).
         */
        public void activate()
        {
            //configure from system property
            try
            {
                m_Port = Integer.parseInt( System.getProperty( "osgi.shell.telnet.port", "6666" ) );
            }
            catch ( NumberFormatException ex )
            {
                Activator.getServices().error( "Listener::activate()", ex );
            }
            try
            {
                m_MaxConnections = Integer.parseInt( System.getProperty( "osgi.shell.telnet.maxconn", "2" ) );
            }
            catch ( NumberFormatException ex )
            {
                Activator.getServices().error( "Listener::activate()", ex );
            }
            m_UseCounter = new AtomicInteger( 0 );
            m_ListenerThread = new Thread( new Acceptor(), "telnetconsole.Listener" );
            m_ListenerThread.start();
        }//activate
    
    
        /**
         * Deactivates this listener.
         * <p/>
         * The listener's socket will be closed, which should cause an interrupt in the
         * listener thread and allow for it to return. The calling thread joins the listener
         * thread until it returns (to ensure a clean stop).
         */
        public void deactivate()
        {
            try
            {
                m_Stop = true;
                //wait for the listener thread
                m_ServerSocket.close();
                m_ListenerThread.join();
            }
            catch ( Exception ex )
            {
                Activator.getServices().error( "Listener::deactivate()", ex );
            }
        }//deactivate
    
        /**
         * Class that implements the listener's accept logic as a <tt>Runnable</tt>.
         */
        private class Acceptor implements Runnable
        {
    
            /**
             * Listens constantly to a server socket and handles incoming connections.
             * One connection will be accepted and routed into the shell, all others will
             * be notified and closed.
             * <p/>
             * The mechanism that should allow the thread to unblock from the ServerSocket.accept() call
             * is currently closing the ServerSocket from another thread. When the stop flag is set,
             * this should cause the thread to return and stop.
             */
            public void run()
            {
                try
                {
                    /*
                        A server socket is opened with a connectivity queue of a size specified
                        in int floodProtection.  Concurrent login handling under normal circumstances
                        should be handled properly, but denial of service attacks via massive parallel
                        program logins should be prevented with this.
                    */
                    m_ServerSocket = new ServerSocket( m_Port, 1 );
                    do
                    {
                        try
                        {
                            Socket s = m_ServerSocket.accept();
                            if ( m_UseCounter.get() >= m_MaxConnections )
                            {
                                //reject with message
                                PrintStream out = new PrintStream( s.getOutputStream() );
                                out.print( INUSE_MESSAGE );
                                out.flush();
                                //close
                                out.close();
                                s.close();
                            }
                            else
                            {
                                m_UseCounter.increment();
                                //run on the connection thread
                                Thread connectionThread = new Thread( new Shell( s, m_UseCounter ) );
                                connectionThread.start();
                            }
                        }
                        catch ( SocketException ex )
                        {
                        }
                    }
                    while ( !m_Stop );
    
                }
                catch ( IOException e )
                {
                    Activator.getServices().error( "Listener.Acceptor::activate()", e );
                }
            }//run
    
        }//inner class Acceptor
    
        private static final String INUSE_MESSAGE = "Connection refused.\r\n"
            + "All possible connections are currently being used.\r\n";
    
    }//class Listener

您可以在这里这里找到其他示例。

请注意,当负载更大时,NIO相对于阻塞模型的优势会发挥作用。从某一点开始,线程创建、管理和上下文切换的额外工作量将限制系统性能。


0

我建议您在JDK中查看样本/nio目录。其中包含许多简单的示例,包括您提到的两个示例。


0

您可能还想考虑使用更高级的框架,例如Grizzly,而不是直接使用NIO。该框架应该允许您专注于您的用例,而不是NIO的细微差别。


网页内容由stack overflow 提供, 点击上面的
可以查看英文原文,
原文链接