使用非阻塞套接字进行多线程

3
我正在尝试使用nio在Java中实现TCP服务器。简单地使用Selector的select方法获取已准备好的键,然后,如果它们是可接受的、可读的等,则处理这些键。当我只使用一个线程时,服务器运行得很正常。但是,当我尝试使用更多线程来处理键时,服务器的响应变慢,最终停止响应,比如在4-5个请求后。这就是我所做的一切(伪代码)。
Iterator<SelectionKey> keyIterator =  selector.selectedKeys().iterator();
while (keyIterator.hasNext()) {
                SelectionKey readyKey = keyIterator.next();
                if (readyKey.isAcceptable()) {
                    //A new connection attempt, registering socket channel with selector

                } else {
                    Worker.add( readyKey );
                }

Worker是执行通道输入/输出的线程类。

以下是我的Worker类代码:

private static List<SelectionKey> keyPool = Collections.synchronizedList(new LinkedList());

public static void add(SelectionKey key) {
    synchronized (keyPool) {
        keyPool.add(key);
        keyPool.notifyAll();
    }
}


public void run() {
    while ( true ) {

        SelectionKey myKey = null;
        synchronized (keyPool) {
            try {
                while (keyPool.isEmpty()) {
                    keyPool.wait();
                }
            } catch (InterruptedException ex) {                    
            }
            myKey = keyPool.remove(0);
            keyPool.notifyAll();
        }

        if (myKey != null && myKey.isValid() ) {

            if (myKey.isReadable()) {
                //Performing reading
            } else if (myKey.isWritable()) {
                //performing writing
                myKey.cancel();  
            }
        }
    }

我的基本想法是将密钥添加到密钥池中,各个线程可以依次获取一个密钥。 我的BaseServer类本身作为一个线程运行。 在事件循环开始之前,它创建了10个Worker线程。 我还尝试提高BaseServer线程的优先级,以便它有更多机会接受可以接受的密钥。 但是,大约在8个请求后,它就停止响应了。 请帮忙看看,我做错了什么。 预先感谢 :)


请查看以下有关生产者/消费者问题和一些好的数据结构的想法,这些可能有助于解决您的问题 - https://dev59.com/j3M_5IYBdhLWcg3w02z8。 - Perception
3个回答

2

第三点,你没有从选定的键集中删除任何内容。你必须在每次循环中执行此操作,例如在调用next()之后调用keyIterator.remove()。

你需要阅读NIO教程。


1

尝试使用xsocket库。它为我节省了大量阅读论坛的时间。

下载链接:http://xsocket.org/

教程链接:http://xsocket.sourceforge.net/core/tutorial/V2/TutorialCore.htm

服务器代码:

import org.xsocket.connection.*;

/**
 *
 * @author wsserver
 */
public class XServer {

    protected static IServer server;

    public static void main(String[] args) {
        try {
            server = new Server(9905, new XServerHandler());
            server.start();
        } catch (Exception ex) {
            System.out.println(ex.getMessage());
        }
    }
     protected static void shutdownServer(){
        try{
            server.close();
        }catch(Exception ex){
            System.out.println(ex.getMessage());
        }        
    }
}

服务器处理程序:
import java.io.IOException;
import java.nio.BufferUnderflowException;
import java.nio.ByteBuffer;
import java.nio.channels.ClosedChannelException;
import java.nio.charset.Charset;
import java.nio.charset.CharsetDecoder;
import java.nio.charset.CharsetEncoder;
import java.util.*;
import org.xsocket.*;
import org.xsocket.connection.*;

public class XServerHandler implements IConnectHandler, IDisconnectHandler, IDataHandler {

    private Set<ConnectedClients> sessions = Collections.synchronizedSet(new HashSet<ConnectedClients>());

    Charset charset = Charset.forName("ISO-8859-1");
    CharsetEncoder encoder = charset.newEncoder();
    CharsetDecoder decoder = charset.newDecoder();
    ByteBuffer buffer = ByteBuffer.allocate(1024);

    @Override
    public boolean onConnect(INonBlockingConnection inbc) throws IOException, BufferUnderflowException, MaxReadSizeExceededException {
        try {
            synchronized (sessions) {
                sessions.add(new ConnectedClients(inbc, inbc.getRemoteAddress()));
            }
            System.out.println("onConnect"+" IP:"+inbc.getRemoteAddress().getHostAddress()+" Port:"+inbc.getRemotePort());
        } catch (Exception ex) {
            System.out.println("onConnect: " + ex.getMessage());
        }
        return true;
    }

    @Override
    public boolean onDisconnect(INonBlockingConnection inbc) throws IOException {
        try {
            synchronized (sessions) {
                sessions.remove(inbc);
            }
            System.out.println("onDisconnect");
        } catch (Exception ex) {
            System.out.println("onDisconnect: " + ex.getMessage());
        }
        return true;
    }

    @Override
    public boolean onData(INonBlockingConnection inbc) throws IOException, BufferUnderflowException, ClosedChannelException, MaxReadSizeExceededException {
        inbc.read(buffer);
        buffer.flip();
        String request = decoder.decode(buffer).toString();
        System.out.println("request:"+request);
        buffer.clear();
        return true;
    }
}

已连接客户端:

import java.net.InetAddress;
import org.xsocket.connection.INonBlockingConnection;

/**
 *
 * @author wsserver
 */
public class ConnectedClients {

    private INonBlockingConnection inbc;
    private InetAddress address;

    //CONSTRUCTOR
    public ConnectedClients(INonBlockingConnection inbc, InetAddress address) {
        this.inbc = inbc;
        this.address = address;
    }

    //GETERS AND SETTERS
    public INonBlockingConnection getInbc() {
        return inbc;
    }

    public void setInbc(INonBlockingConnection inbc) {
        this.inbc = inbc;
    }

    public InetAddress getAddress() {
        return address;
    }

    public void setAddress(InetAddress address) {
        this.address = address;
    }
}

客户端代码:

import java.net.InetAddress;
import org.xsocket.connection.INonBlockingConnection;
import org.xsocket.connection.NonBlockingConnection;

/**
 *
 * @author wsserver
 */
public class XClient {

    protected static INonBlockingConnection inbc;
    public static void main(String[] args) {
        try {
            inbc = new NonBlockingConnection(InetAddress.getByName("localhost"), 9905, new XClientHandler());

            while(true){

            }
        } catch (Exception ex) {
            System.out.println(ex.getMessage());
        }
    }
}

客户端处理程序:
import java.io.IOException;
import java.nio.BufferUnderflowException;
import java.nio.ByteBuffer;
import java.nio.channels.ClosedChannelException;
import java.nio.charset.Charset;
import java.nio.charset.CharsetDecoder;
import java.nio.charset.CharsetEncoder;
import org.xsocket.MaxReadSizeExceededException;
import org.xsocket.connection.IConnectExceptionHandler;
import org.xsocket.connection.IConnectHandler;
import org.xsocket.connection.IDataHandler;
import org.xsocket.connection.IDisconnectHandler;
import org.xsocket.connection.INonBlockingConnection;

/**
 *
 * @author wsserver
 */
public class XClientHandler implements IConnectHandler, IDataHandler,IDisconnectHandler, IConnectExceptionHandler {

    Charset charset = Charset.forName("ISO-8859-1");
    CharsetEncoder encoder = charset.newEncoder();
    CharsetDecoder decoder = charset.newDecoder();
    ByteBuffer buffer = ByteBuffer.allocate(1024);

    @Override
    public boolean onConnect(INonBlockingConnection nbc) throws IOException {
        System.out.println("Connected to server");
        nbc.write("hello server\r\n");
        return true;
    }

    @Override
    public boolean onConnectException(INonBlockingConnection nbc, IOException ioe) throws IOException {

        System.out.println("On connect exception:"+ioe.getMessage());
        return true;
    }

    @Override
    public boolean onDisconnect(INonBlockingConnection nbc) throws IOException {

        System.out.println("Dissconected from server");
        return true;
    }

    @Override
    public boolean onData(INonBlockingConnection inbc) throws IOException, BufferUnderflowException, ClosedChannelException, MaxReadSizeExceededException {

        inbc.read(buffer);
        buffer.flip();
        String request = decoder.decode(buffer).toString();
        System.out.println(request);
        buffer.clear();
        return true;
    }
}

我在论坛上花了很多时间阅读相关内容,希望我的代码能对你有所帮助。


1
首先,您现在不应该真的再使用wait()和notify()调用了,因为java.util.concurrent中存在很好的标准Java(1.5+)包装器类,例如BlockingQueue
其次,建议在选择线程本身中进行IO,而不是在工作线程中进行。工作线程应该只将读/写排队到选择器线程中。
这个页面解释得很好,甚至提供了一个简单TCP/IP服务器的工作代码示例:http://rox-xmlrpc.sourceforge.net/niotut/ 抱歉,我现在还没有时间查看您的具体示例。

网页内容由stack overflow 提供, 点击上面的
可以查看英文原文,
原文链接