Java多线程:CPU利用率过高

4
我正在使用一个服务,从Kafka读取消息并将其推送到Cassandra。
我正在使用线程架构来实现相同的功能。
假设有k个线程从Kafka主题中消费。这些线程将写入一个声明为以下内容的队列:
public static BlockingQueue<>

现在有许多线程,比如 n 条线程,它们要写入Cassandra。这是执行此操作的代码:
public void run(){
    LOGGER.log(Level.INFO, "Thread Created: " +Thread.currentThread().getName());
    while (!Thread.currentThread().isInterrupted()) {
        Thread.yield();
        if (!content.isEmpty()) {
            try {
                JSONObject msg = content.remove();
                // JSON
                for(String tableName : tableList){
                    CassandraConnector.getSession().execute(createQuery(tableName, msg));
                }
            } catch (Exception e) {

            }
        }
    }
}

content是用于读写操作的阻塞队列。

我在线程实现中扩展了Thread类,有一定数量的线程在不被中断的情况下继续执行。

问题是,这使用了太多的CPU。以下是top命令的第一行:

  PID USER      PR  NI    VIRT    RES    SHR S  %CPU %MEM     TIME+ COMMAND
46232 vishran+  20   0 3010804 188052  14280 S 137.8  3.3   5663:24 java

这是该进程线程上 strace 的输出:

strace -t -p 46322
Process 46322 attached
15:18:47 sched_yield()                  = 0
15:18:47 sched_yield()                  = 0
15:18:47 sched_yield()                  = 0
15:18:47 sched_yield()                  = 0
15:18:47 sched_yield()                  = 0
15:18:47 sched_yield()                  = 0
15:18:47 sched_yield()                  = 0
15:18:47 sched_yield()                  = 0
15:18:47 sched_yield()                  = 0
15:18:47 sched_yield()                  = 0
15:18:47 sched_yield()                  = 0
15:18:47 sched_yield()                  = 0
15:18:47 sched_yield()                  = 0
15:18:47 sched_yield()                  = 0
15:18:47 sched_yield()                  = 0
....and so on

我为什么使用Thread.yield(),是因为这个问题

如果您需要其他调试信息,请告诉我。

现在的问题是,如何最小化CPU利用率?

3个回答

7
整个BlockingQueue的目的就是在队列为空时阻塞。因此,消费者线程(向Cassandra填充数据的线程)不必手动检查它们是否为空。您只需调用take(),如果队列为空,则该调用将被阻塞,除非它被中断或有元素可用。
当一个线程被阻塞时,调度程序可以安排其他线程来代替它,这样就不必调用yield()等方法。请记住,只有优先级大于或等于正在yield()的线程的线程可用于运行时,yield()才会让出线程。
public void run(){
    LOGGER.log(Level.INFO, "Thread Created: " +Thread.currentThread().getName());
    try {
            JSONObject msg = content.take();
            // JSON
            for(String tableName : tableList){
                CassandraConnector.getSession().execute(createQuery(tableName, msg));
            }
     } catch (InterruptedException e) {
            Thread.currentThread().interrupt();
     }
}

你能请加上一个代码片段吗?我会尝试在@Robert的回答中添加这个块。 - vish4071
谢谢。还有一个问题,.take() 方法会弹出元素吗? - vish4071
还有一件事。现在不需要使用.yield是吗?如果是这样...你能告诉我原因吗?为什么我们不使用isInterrupted()检查? - vish4071
是的,正如我在上面的回答中提到的那样,yield()不再是必需的。take()是一个阻塞调用,它将线程设置为BLOCKED状态,因此它不会消耗任何资源。调度程序负责安排其他可运行的线程。 - MS Srikkanth
那么isInterrupted()检查呢?这将运行一次并执行一个查询...然后这个线程会死掉...不是吗? - vish4071
是的,它将在执行单个查询后停止。如果您需要它持续运行,则像原始代码中一样添加while循环。我的想法只是向您展示如何使用take(),因此我只挑选了相关的代码。但请记住,您代码中的while循环永远不会停止,因为您正在捕获InterruptedException并且没有采取任何措施。当捕获InterruptedException时,中断状态会自动清除。因此,您必须像我所做的那样手动设置它。 - MS Srikkanth

3
从您的代码看来,似乎您的消费者线程总是在检查是否有可用内容。因此,您的线程始终在运行,并且从不空闲(等待有人通知它们),因此即使它总是将当前线程让给其他线程,但您的CPU也一直在做某些事情。
显然,您正在尝试解决我们中许多人在编程生涯中遇到的生产者-消费者问题。
您目前正在做的是让消费者主动不断地检查是否有可消费的内容。
最简单、最易于CPU处理的解决方法是:
1. 生产者向消费者发出已经生产了一些东西的信号。
请参考这个例子,其中包含了最简单的实现方式。您可能还想查看Java并发实践以获取更深入的帮助。

2
@vish4071,你应该按照设计使用BlockingQueue:使用其中一个阻塞方法:take()或者poll(timeout, unit) - Mark Rotteveel

0

正如其他答案中已经描述的那样,您正在执行繁忙等待而不是使用您的content BlockingQueue 的核心特性:等待下一个条目并将其从队列中移除。这可以使用take()方法来完成:

while (!Thread.currentThread().isInterrupted()) {
    try {
        JSONObject msg = content.take();
        for(String tableName : tableList){
            CassandraConnector.getSession().execute(createQuery(tableName, msg));
        }
    } catch (Exception e) {

    }
}

网页内容由stack overflow 提供, 点击上面的
可以查看英文原文,
原文链接