向所有客户端发送消息(客户端-服务器通信)

16
现在,我正在制作一个基于多线程的客户端服务器应用程序。在服务器端,我为每个接受到的连接创建一个线程。
在线程类中,我编写了一个向客户端发送命令的方法。我想知道的是,如何将参数发送给所有正在运行的客户端?简单来说,我只想让这个服务器向所有连接的客户端发送一条消息。
我已经阅读了此帖子并从此链接找到了sendToAll(String message)方法。但当我尝试在我的代码中使用时,ServerSocket中没有这样的方法。
好的,这是我的服务器和线程的示例代码。
class ServerOne{

ServerSocket server = null;
...

ServerOne(int port){
            System.out.println("Starting server on port "+port);
    try{
      server = new ServerSocket(port);
              System.out.println("Server started successfully and now waiting for client");

    } catch (IOException e) {
      System.out.println("Could not listen on port "+port);
      System.exit(-1);
    }
}

public void listenSocket(){ 
    while(true){
        ClientWorker w;
        try{
            w = new ClientWorker(server.accept());
            Thread t = new Thread(w);
            t.start();
        } catch (IOException e) {
            System.out.println("Accept failed: 4444");
            System.exit(-1);
        }   
    }
}

protected void finalize(){
    try{
        server.close();
    } catch (IOException e) {
        System.out.println("Could not close socket");
        System.exit(-1);
    }
}
}

class ClientWorker implements Runnable{
Socket client;

ClientWorker(Socket client){
    this.client = client;
}
public void run(){
    ...
      sendCommand(parameter);
    ...
}

public void sendCommand(String command){
    PrintWriter out = null;
    try {
        out = new PrintWriter(client.getOutputStream(), true);
        out.println(command);
    } catch (IOException ex) {}
}

}

感谢帮助 :)

3个回答

22
下面的答案不建议用于完整的服务器,对于这种情况,应该使用Java EE与servlets、web服务等。这只适用于少数计算机想要连接以执行特定任务,并且使用简单的Java sockets并不是一个普遍的问题。想想分布式计算或多人游戏。
编辑:我已经更新了这个架构,现在测试和线程安全。任何需要它的人都可以从这里下载它。只需使用(直接或通过子类化)Server和Client,启动它们,一切都准备好了。阅读内联注释以获取更强大的选项。
虽然客户端之间的通信非常复杂,但我会尽可能地简化它。
以下是服务器端的要点:
- 保持已连接客户端列表。 - 定义一个用于服务器输入的线程。 - 定义一个接收到的消息队列。 - 从队列中轮询一个线程,并处理它。 - 一些发送消息的实用方法。
而对于客户端:
- 定义一个用于客户端输入的线程。 - 定义一个接收到的消息队列。 - 从队列中轮询一个线程,并处理它。
下面是Server类:
public class Server {
    private ArrayList<ConnectionToClient> clientList;
    private LinkedBlockingQueue<Object> messages;
    private ServerSocket serverSocket;

    public Server(int port) {
        clientList = new ArrayList<ConnectionToClient>();
        messages = new LinkedBlockingQueue<Object>();
        serverSocket = new ServerSocket(port);

        Thread accept = new Thread() {
            public void run(){
                while(true){
                    try{
                        Socket s = serverSocket.accept();
                        clientList.add(new ConnectionToClient(s));
                    }
                    catch(IOException e){ e.printStackTrace(); }
                }
            }
        };

        accept.setDaemon(true);
        accept.start();

        Thread messageHandling = new Thread() {
            public void run(){
                while(true){
                    try{
                        Object message = messages.take();
                        // Do some handling here...
                        System.out.println("Message Received: " + message);
                    }
                    catch(InterruptedException e){ }
                }
            }
        };

        messageHandling.setDaemon(true);
        messageHandling.start();
    }
    
    private class ConnectionToClient {
        ObjectInputStream in;
        ObjectOutputStream out;
        Socket socket;

        ConnectionToClient(Socket socket) throws IOException {
            this.socket = socket;
            in = new ObjectInputStream(socket.getInputStream());
            out = new ObjectOutputStream(socket.getOutputStream());

            Thread read = new Thread(){
                public void run(){
                    while(true){
                        try{
                            Object obj = in.readObject();
                            messages.put(obj);
                        }
                        catch(IOException e){ e.printStackTrace(); }
                    }
                }
            };

            read.setDaemon(true); // terminate when main ends
            read.start();
        }

        public void write(Object obj) {
            try{
                out.writeObject(obj);
            }
            catch(IOException e){ e.printStackTrace(); }
        }
    }

    public void sendToOne(int index, Object message)throws IndexOutOfBoundsException {
        clientList.get(index).write(message);
    }

    public void sendToAll(Object message){
        for(ConnectionToClient client : clientList)
            client.write(message);
    }

}

接下来是Client类的代码:

public class Client {
    private ConnectionToServer server;
    private LinkedBlockingQueue<Object> messages;
    private Socket socket;

    public Client(String IPAddress, int port) throws IOException{
        socket = new Socket(IPAddress, port);
        messages = new LinkedBlokingQueue<Object>();
        server = new ConnecionToServer(socket);

        Thread messageHandling = new Thread() {
            public void run(){
                while(true){
                    try{
                        Object message = messages.take();
                        // Do some handling here...
                        System.out.println("Message Received: " + message);
                    }
                    catch(InterruptedException e){ }
                }
            }
        };

        messageHandling.setDaemon(true);
        messageHandling.start();
    }

    private class ConnectionToServer {
        ObjectInputStream in;
        ObjectOutputStream out;
        Socket socket;

        ConnectionToServer(Socket socket) throws IOException {
            this.socket = socket;
            in = new ObjectInputStream(socket.getInputStream());
            out = new ObjectOutputStream(socket.getOutputStream());

            Thread read = new Thread(){
                public void run(){
                    while(true){
                        try{
                            Object obj = in.readObject();
                            messages.put(obj);
                        }
                        catch(IOException e){ e.printStackTrace(); }
                    }
                }
            };

            read.setDaemon(true);
            read.start();
        }

        private void write(Object obj) {
            try{
                out.writeObject(obj);
            }
            catch(IOException e){ e.printStackTrace(); }
        }


    }

    public void send(Object obj) {
        server.write(obj);
    }
}

你的答案很好,但是你缺少了几个冒号,并且你应该将 messages.dequeue();messages.enqueue(); 改为 messages.take();messages.put(); - Danon
2
另外,你确定这个类是线程安全的吗?我的意思是,如果我在accept/线程更改clientList的同时调用sendToOne()/sendToAll会怎样? - Danon

3

服务器套接字中没有向所有正在运行的客户端线程发送数据或消息的方法。 请查看ServerThread.java程序,该程序使用服务器调用sendToAll

// ... and have the server send it to all clients
server.sendToAll( message );

2
请查看 zeroMQ。有一些方法被称为“发布订阅”或“pub sub”,可以实现您想要的功能。您还可以使用它在线程之间通信。在我看来,这是一个非常棒的库。它有 Java 或 jzmq 绑定,以及其他 30 多个绑定,因此您应该能够在程序中使用它。 http://www.zeromq.org/

网页内容由stack overflow 提供, 点击上面的
可以查看英文原文,
原文链接