我正在使用一个服务,从Kafka读取消息并将其推送到Cassandra。
我正在使用线程架构来实现相同的功能。
假设有k个线程从Kafka主题中消费。这些线程将写入一个声明为以下内容的队列:
现在有许多线程,比如
我正在使用线程架构来实现相同的功能。
假设有k个线程从Kafka主题中消费。这些线程将写入一个声明为以下内容的队列:
public static BlockingQueue<>
现在有许多线程,比如
n
条线程,它们要写入Cassandra。这是执行此操作的代码:public void run(){
LOGGER.log(Level.INFO, "Thread Created: " +Thread.currentThread().getName());
while (!Thread.currentThread().isInterrupted()) {
Thread.yield();
if (!content.isEmpty()) {
try {
JSONObject msg = content.remove();
// JSON
for(String tableName : tableList){
CassandraConnector.getSession().execute(createQuery(tableName, msg));
}
} catch (Exception e) {
}
}
}
}
content
是用于读写操作的阻塞队列。
我在线程实现中扩展了Thread
类,有一定数量的线程在不被中断的情况下继续执行。
问题是,这使用了太多的CPU。以下是top
命令的第一行:
PID USER PR NI VIRT RES SHR S %CPU %MEM TIME+ COMMAND
46232 vishran+ 20 0 3010804 188052 14280 S 137.8 3.3 5663:24 java
这是该进程线程上 strace
的输出:
strace -t -p 46322
Process 46322 attached
15:18:47 sched_yield() = 0
15:18:47 sched_yield() = 0
15:18:47 sched_yield() = 0
15:18:47 sched_yield() = 0
15:18:47 sched_yield() = 0
15:18:47 sched_yield() = 0
15:18:47 sched_yield() = 0
15:18:47 sched_yield() = 0
15:18:47 sched_yield() = 0
15:18:47 sched_yield() = 0
15:18:47 sched_yield() = 0
15:18:47 sched_yield() = 0
15:18:47 sched_yield() = 0
15:18:47 sched_yield() = 0
15:18:47 sched_yield() = 0
....and so on
我为什么使用Thread.yield()
,是因为这个问题
如果您需要其他调试信息,请告诉我。
现在的问题是,如何最小化CPU利用率?
.take()
方法会弹出元素吗? - vish4071.yield
是吗?如果是这样...你能告诉我原因吗?为什么我们不使用isInterrupted()
检查? - vish4071isInterrupted()
检查呢?这将运行一次并执行一个查询...然后这个线程会死掉...不是吗? - vish4071