ThreadPoolExecutor.shutdownNow()方法在线程中未抛出InterruptedException异常。

3
我正在实现一个传输服务器程序,它从客户端(通过控制台输入)接收消息,然后将其转发到某种邮箱。
为了允许不同客户端同时接收多个消息,我首先创建了一个实现了Runnable接口的类。这些类实例中的每一个都将处理与恰好一个客户端的通信:
public class ClientConnection implements Runnable {

    //...

    //...

    @Override
    public void run() {
        try {
            // prepare the input reader and output writer
            BufferedReader reader = new BufferedReader(new InputStreamReader(clientSocket.getInputStream()));
            PrintWriter writer = new PrintWriter(clientSocket.getOutputStream(), true);

            Message message = new Message();
            String request = "";

            // read client requests
            while ((request = reader.readLine()) != null) {

                System.out.println("Client sent the following request: " + request);
                String response;
                if (request.trim().equals("quit")) {
                    writer.println("ok bye");
                    return;
                }

                response = message.parseRequest(request);
                if (message.isCompleted()) {
                    messagesQueue.put(message);
                    message = new Message();
                }
                writer.println(response);
            }

        } catch (SocketException e) {
            System.out.println("ClientConnection: SocketException while handling socket: " + e.getMessage());
        } catch (IOException e) {
            throw new UncheckedIOException(e);
        } catch (InterruptedException e) {
            System.out.println("Client Connection was interrupted!");
            e.printStackTrace();
        } finally {
            if (clientSocket != null && !clientSocket.isClosed()) {
                try {
                    clientSocket.close();
                } catch (IOException ignored) {}
            }

        }

    }
}

我有一个父线程,负责启动和管理所有ClientConnection可运行实例:

@Override
public void run() {

    clientConnectionExecutor = (ThreadPoolExecutor) Executors.newCachedThreadPool();
    while (true) {

        Socket clientSocket;

        try {
            // wait for a Client to connect
            clientSocket = serverSocket.accept();

            ClientConnection clientConnection = new ClientConnection(clientSocket, messagesQueue);
            clientConnectionExecutor.execute(clientConnection);

        } catch (IOException e) {
            // when this exception occurs, it means that we want to shut down everything
            clientConnectionExecutor.shutdownNow();  // force terminate all ClientConnections
            return;
        }
    }
}

根据这个Stackoverflow问题,我本来期望一旦调用shutdownNow();方法,ClientConnection.run()方法中就会抛出InterruptedException并输出Client Connection was interrupted!。但实际上没有发生这种情况,因此异常捕获块似乎从未被执行到,输入读取循环仍在进行。
我在另一个Stackoverflow问题中看到说这可能与代码块中的其他代码行消耗了InterruptedException有关,但是没有具体说明哪行代码会这样做。所以我非常感谢任何提示。
编辑:事实证明,只要在客户端上键入“quit”手动退出循环,循环就会停止,然后会输出Client Connection was interrupted!。因此,在循环运行时异常似乎被忽略,只有在之后才得到处理。
2个回答

1

来自Oracle文档的shutdownNow

除了尽力试图停止正在执行任务的处理之外,没有任何保证。例如,典型的实现将通过Thread.interrupt()取消,因此任何未能响应中断的任务可能永远不会终止。

如果您查看ThreadPoolExecutor源代码,您会发现shutdownNow使用以下代码中断线程:

        void interruptIfStarted() {
            Thread t;
            if (getState() >= 0 && (t = thread) != null && !t.isInterrupted()) {
                try {
                    t.interrupt();
                } catch (SecurityException ignore) {
                }
            }
        }

您的ClientConnection没有检查标志Thread.interrupted。根据帖子中的信息,我无法确定哪个方法会抛出InterruptedException。可能是其他一些方法,例如读取器或写入器的readLine,阻塞了线程,因为它们使用套接字的InputStreamOutputStream,而且显然,如果数据不是立即可用,则套接字流将阻塞线程。

例如,我编写了此代码进行测试:

class Example {
    public static void main(String[] args) {
        Thread thread = new Thread(() -> {
            try(ServerSocket serverSocket = new ServerSocket()) {
                serverSocket.bind(new InetSocketAddress(8080));
                Socket socket = serverSocket.accept();
                int dataByte = socket.getInputStream().read();
                System.out.println(dataByte);
            } catch (IOException e) {
                e.printStackTrace();
            }
        });
        thread.start();
        thread.interrupt();
    }
}

在OpenJdk-16.0.2上,实际上没有中断。
我看到您的问题有两种可能的解决方案:
1. 如果您确定Socket不会阻塞您的线程,请在while循环内部检查Thread.interrupted。 2. 如果您不确定,请改用非阻塞模式下的SocketChannel手动检查Thread.interrupted。
对于第二种方法,我将我的示例转换为以下内容:
class Example {
    public static void main(String[] args) {
        Thread thread = new Thread(() -> {
            try(ServerSocketChannel serverSocket = ServerSocketChannel.open()) {
                serverSocket.configureBlocking(false);
                serverSocket.bind(new InetSocketAddress(8080));

                SocketChannel socket = null;

                while (socket == null) {
                    socket = serverSocket.accept();

                    if (Thread.interrupted()) {
                        throw new InterruptedException();
                    }
                }

                ByteBuffer byteBuffer = ByteBuffer.allocate(1024);
                socket.read(byteBuffer);
                byte[] bytes = new byte[byteBuffer.limit()];
                byteBuffer.flip();
                byteBuffer.get(bytes);
                System.out.println(new String(bytes, StandardCharsets.UTF_8));
            } catch (IOException e) {
                e.printStackTrace();
            } catch (InterruptedException e) {
                System.out.println("Interrupted successfully");
            }
        });
        thread.start();
        thread.interrupt();
    }
}

它可以正常工作。

祝你在Java方面好运:)


感谢您的详细解释,现在我确信readline()函数会阻塞所有操作。我之前并不知道Java中存在不能被打断的阻塞函数。 所以我按照您的方法,在while循环条件中也检查了Thread.interrupted标志。但是这种方法有一个缺陷,就是似乎需要客户端再按一次回车键才能真正退出客户端。因此,循环条件会比实际情况晚一步判断为false。我将进一步研究SocketChannel类,看看它是否符合我的需求。 - BenjyTec
@BenjyTec 听起来线程在 readLine() 处被阻塞了一次。所以,正如我在答案中所说的,根据我的一点研究,没有办法通过 Socket 来防止这种行为。此外,我建议您在读写 SocketSocketChannel 时要小心,因为这不是一种有效的方式。问题在于您实际上需要一个线程来处理连接。IO 操作可能比业务逻辑操作慢得多。我的建议是在单独的线程中进行读写操作。 - Roggi
1
关于SocketChannel的一般性说明。我强烈建议您使用SocketChannel,因为它可以在阻塞和非阻塞模式下同时运行。因此,在您的代码中,SocketChannelServerSocketChannel总是可以替代旧的SocketServerSocket。此外,您还可以使用ByteBuffer,它允许您轻松实现任何内部缓冲区的功能。如果您不熟悉ByteBuffer,请也看一下它。 - Roggi

1
我原本期望调用shutdownNow()后,ClientConnection.run()中就会抛出InterruptedException异常。
你的messagesQueue应该是一个BlockingQueue。所以当调用messagesQueue.put(message)时,你需要捕获Interrupted异常。只有当线程在put方法中被阻塞(queue已满),调用threadpool#shutdownNow,线程才会收到Interrupted异常。在其他情况下,线程将不会收到这个Interrupted异常。
你可以将while ((request = reader.readLine()) != null)改为while ((request = reader.readLine()) != null && !Thread.interrupted())。
另一种解决方案是维护所有客户端socket,并在需要关闭它们时关闭所有客户端socket,这样客户端线程将直接收到IOException异常。
        List<Socket> clientSockets = new ArrayList<>();
        while (true) {
            try {
                Socket accept = serverSocket.accept();
                clientSockets.add(accept);
                executorService.submit(new ClientConnection(accept));
            }catch (Exception e) {
                for (Socket socket : clientSockets) {
                    try {
                        socket.close();
                    } catch (Exception exception) {
                        //
                    }
                }
                //executorService.shutdownNow();
            }
        }

谢谢,你的第二种方法看起来很有前途。不过我有一个问题,clientSockets 列表会随着时间推移不断膨胀,是吗?因为它将包含已经在数小时前断开连接的客户端的套接字。 - BenjyTec
是的,你应该有一个特殊的类(也许叫做ClientManager..)来维护所有客户端套接字,在客户端关闭时删除相应的套接字。 - zysaaa

网页内容由stack overflow 提供, 点击上面的
可以查看英文原文,
原文链接