多线程套接字通信客户端/服务器

7
我完成了一个客户端/服务器套接字通信程序,它运行良好。现在我正在尝试弄清楚如何使服务器同时拥有多个客户端连接。我已经查看了一些资料,发现有不止一种方法可以实现这个功能。因此,我来这里寻求帮助和建议。
我的服务器:
public class Server {
    private ServerSocket serverSocket = null;
    private Socket clientSocket = null;

    public Server() {
        try {
            serverSocket = new ServerSocket(7003);
        } catch (IOException e) {
            System.err.println("Could not listen on port: 7003");
            System.exit(1);
        }

        try {
            clientSocket = serverSocket.accept();
        } catch (IOException e) {
            System.err.println("Accept failed");
            System.exit(1);
        }
    }

    public void startServer() throws IOException {
        PrintWriter output = new PrintWriter(clientSocket.getOutputStream(), true);
        BufferedReader input = new BufferedReader(new InputStreamReader(clientSocket.getInputStream()));

        String inputLine, outputLine;

        outputLine = "Connected to Server";
        output.println(outputLine);

        while ((inputLine = input.readLine()) != null) {
            // This just determines users input and server ruturns output based on that

            outputLine = this.getServerOutput(inputLine);
            output.println(outputLine);

            if (outputLine.equals("Bye"))
                break;
        }

        output.close();
        input.close();
        clientSocket.close();
        serverSocket.close();
    }
}

我需要在构造函数中创建线程并启动 startServer() 还是在运行方法中实现?

4个回答

12

你应该使用ExecutorService。客户端请求处理将是Runnablerun()函数,每次接受请求后可以调用ExecutorService.submit(runnableTask)来异步服务于客户端。

以下是使用ExecutorService的示例。

public class MyServer {

    private static MyServer server; 
    private ServerSocket serverSocket;

    /**
     * This executor service has 10 threads. 
     * So it means your server can process max 10 concurrent requests.
     */
    private ExecutorService executorService = Executors.newFixedThreadPool(10);        

    public static void main(String[] args) throws IOException {
        server = new MyServer();
        server.runServer();
    }

    private void runServer() {        
        int serverPort = 8085;
        try {
            System.out.println("Starting Server");
            serverSocket = new ServerSocket(serverPort); 

            while(true) {
                System.out.println("Waiting for request");
                try {
                    Socket s = serverSocket.accept();
                    System.out.println("Processing request");
                    executorService.submit(new ServiceRequest(s));
                } catch(IOException ioe) {
                    System.out.println("Error accepting connection");
                    ioe.printStackTrace();
                }
            }
        }catch(IOException e) {
            System.out.println("Error starting Server on "+serverPort);
            e.printStackTrace();
        }
    }

    //Call the method when you want to stop your server
    private void stopServer() {
        //Stop the executor service.
        executorService.shutdownNow();
        try {
            //Stop accepting requests.
            serverSocket.close();
        } catch (IOException e) {
            System.out.println("Error in server shutdown");
            e.printStackTrace();
        }
        System.exit(0);
    }

    class ServiceRequest implements Runnable {

        private Socket socket;

        public ServiceRequest(Socket connection) {
            this.socket = connection;
        }

        public void run() {

            //Do your logic here. You have the `socket` available to read/write data.

            //Make sure to close
            try {
                socket.close();
            }catch(IOException ioe) {
                System.out.println("Error closing client connection");
            }
        }        
    }
}

几点评论:你的shutdownNow()不会杀死服务器,因为accept()忽略了中断。在这里关闭服务器套接字是正确的方法。startServer()应该真正成为runServer(),因为它永远不会返回。拥有启动和停止方法是具有误导性的。如果从未使用,则为什么要提交一个Callablesocket.close()应该在try/finally块内。服务器接受循环也是如此。 - Gray
嘿,basiljames,我不知道你是否看到了我下面的最后一篇帖子,但是我在跟随你发布的示例时遇到了一些麻烦。由于我以前从未使用过ExecutorService,所以我该如何将其实现到我的程序中呢? - Nick
@Nick 抱歉没有回复你。我不得不离开家。看起来Gray已经解决了你的疑虑。如果需要任何澄清,请告诉我。 - basiljames
我曾尝试在服务器中使用此代码段,但输出流无法正常工作。`serverSocket = new ServerSocket(serverPort); PrintWriter output = new PrintWriter(socket.getOutputStream(), true);BufferedReader input = new BufferedReader(new InputStreamReader(socket.getInputStream()));while(input.readLine() != null) {output.println(pro.processInput(input.readLine()));}` - Galen Nare

4
如何使得我可以同时拥有多个客户端连接到服务器
现在你正在启动服务器,并立即在构造函数中等待单个客户端连接。
clientSocket = serverSocket.accept();

然后你在startServer()方法中处理单个套接字连接。这意味着不会处理其他客户端。

public void startServer() throws IOException {
    PrintWriter output = new PrintWriter(clientSocket.getOutputStream(), true);
    ...

通常像这样的服务器模式中,您需要执行以下操作:
  1. 在构造函数中设置服务器套接字。
  2. 创建一个acceptClients()方法,它将循环等待客户端被接受。 这可以分叉一个线程以在后台的自己的线程中接受客户端。
  3. 对于每个客户端,要么分叉一个线程来处理连接,将线程传递给客户端套接字。 更好的方法是使用ExecutorService为您管理线程,就像@basiljames所示。
以下是一些示例代码:
public class Server {
    private ServerSocket serverSocket = null;

    public Server(int portNumber) throws IOException {
        serverSocket = new ServerSocket(portNumber);
    }

    // this could be run in a thread in the background
    public void acceptClients() throws IOException {
        // create an open ended thread-pool
        ExecutorService threadPool = Executors.newCachedThreadPool();
        try {
            while (!Thread.currentThread().isInterrupted()) {
                // wait for a client to connect
                Socket clientSocket = serverSocket.accept();
                // create a new client handler object for that socket,
                // and fork it in a background thread
                threadPool.submit(new ClientHandler(clientSocket));
            }
        } finally {
            // we _have_ to shutdown the thread-pool when we are done
            threadPool.shutdown();
        }
    }

    // if server is running in background, you stop it by killing the socket
    public void stop() throws IOException {
        serverSocket.close();
    }

    // this class handles each client connection
    private static class ClientHandler implements Runnable {
        private final Socket clientSocket;
        public ClientHandler(Socket clientSocket) {
            this.clientSocket = clientSocket;
        }
        public void run() {
            // use the client socket to handle the client connection
            ...
        }
    }
}

建议使用ExecutorService线程池来实现几乎所有的Thread实现。如果您因为某些原因而必须使用原始的Thread,则可以在acceptClients()方法中按照以下方式进行操作:

    public void acceptClients() throws IOException {
        while (!Thread.currentThread().isInterrupted()) {
            // wait for a client to connect
            Socket clientSocket = serverSocket.accept();
            // fork a background client thread
            new Thread(new ClientHandler(clientSocket)).start();
        }
    }

我该如何分叉线程以处理每个客户端连接?我只需使用新的Thread(new Runnable())启动新线程并将Client socket作为参数传递吗?我认为这是我需要采取的方法,因为我不太理解ExecutorService。 - Nick
花点时间去理解ExecutorService@Nick。它被推荐作为大多数new Thread()场景的替代品。话虽如此,我已经在我的答案中添加了如何使用原始线程的内容。 - Gray
我肯定会阅读ExecutorService相关的内容,但现在被要求处理线程。而且,由于所有这些都将从单个客户端程序运行,我是否能够指定要创建多少个线程呢?非常感谢您的帮助,我是初学者,正在尝试理解它。 - Nick
这将为每个客户端创建1个线程。如果您想控制同时连接到服务器的客户端数量,则这将更加困难。使用Executors.newFixedThreadPool(numThreads)来定义您的线程池将限制线程数,但不会限制连接数。 - Gray
基本上,我要求服务器执行一项低负载操作和一项高负载操作。 低负载是要求它返回当前日期和时间,而高负载则是类似于空闲内存,并测量执行每个操作所需的时间。 我想做的是创建客户端线程,以便每个线程/客户端都可以对这两个操作进行自己的测量。 - Nick
@Nick,它起作用了吗?如果我的回答有帮助,请务必点赞并在适当的情况下接受它。 - Gray

2

将这个修改: public void startServer() throws IOException 修改为: public void startServer(Socket clientSocket) throws IOException

接下来你只需要:

public Server()
{
    try
    {
        serverSocket = new ServerSocket(7003);
    }
    catch (IOException e)
    {
        System.err.println("Could not listen on port: 7003");
        System.exit(1);
    }

    try
    {
        while(true) {
            final Socket socket = serverSocket.accept();
            new Thread(new Runnable() {
                public void run() {
                    try {
                        startServer(socket);
                    } catch(IOException e) {e.printStackTrace();}
                }
            }).start();
        }
    }
    catch(IOException e)
    {
        System.err.println("Accept failed");
        System.exit(1);
    }
}

最后,您可以删除private Socket clientSocket = null; 这样应该就可以了。或者至少非常接近了。

这个可以工作,但是有没有办法让客户端指定他们想要运行多少个线程?例如,目前我只有多个客户端程序在运行,但我想让它变成只有一个客户端程序在运行,但可以创建多个线程,所有线程都执行相同的功能。 - Nick
是的,这就是线程池管理。看一下@basiljames的回复。基本上,你创建一个池:ExecutorService pool = Executors.newFixedThreadPool(5); 然后将工作提交给它,而不是创建一个新线程。 - mprivat
我在跟随他发布的代码时遇到了麻烦,但是你发布的代码我可以理解。你能否向我展示如何将那段代码转换为我手头的代码呢?再次感谢! - Nick

0
private static final int SERVER_PORT = 35706;
private ServerSocket serverSocket;
private final ArrayList<ClientThread> activeClients = new ArrayList<>();

public void startServer() {

    try {
        serverSocket = new ServerSocket(SERVER_PORT);
        
        final ExecutorService clientPool = Executors.newCachedThreadPool();

        while (!serverSocket.isClosed()) {

            try {
                Future<Socket> future = clientPool.submit(() -> {
                       Socket socket = serverSocket.accept();
                       ClientThread clientThread= new ClientThread(socket);
                       return (socket);
                });

                activeClients.add(future.get());
            } catch (IOException e) {
                clientPool.shutdownNow();
                System.out.println(e.getMessage());
            } catch (InterruptedException | ExecutionException e) {
                System.out.println(e.getMessage());
            }
        }

    } catch (IOException e) {
        System.out.println(e.getMessage());
    }
}



public void stopServer() {  

   try {
        serverSocket.close();
        activeClients.forEach(socket -> {
            try {
                socket.close();
            } catch (IOException e) {
                System.out.println(e.getMessage());
            }
        });
            
   } catch (IOException ex) {
        System.out.println(e.getMessage());
   }

}



private static class ClientThread implements Runnable{
    private final Socket socket;

    public ClientThread(Socket socket) throws IOException {
       this.socket = socket;
    }
        
    @Override
    public void run() {
        /* Your implementation */
    }
}

网页内容由stack overflow 提供, 点击上面的
可以查看英文原文,
原文链接