我使用了Java 8并行流测试了该解决方案:
jobs.parallelStream().forEach { it.execute() }
我发现CPU利用率始终保持在100%。 作为参考,我使用了这个计算任务:
class MyJob {
fun execute(): Double {
val rnd = ThreadLocalRandom.current()
var d = 1.0
(1..rnd.nextInt(1_000_000)).forEach { _ ->
d *= 1 + rnd.nextDouble(0.0000001)
}
return d
}
}
请注意,其持续时间随机变化,从零到执行100,000,000次浮点数乘法所需的时间不等。
出于好奇心,我也研究了您在问题中添加的代码作为解决方案。我发现其中存在一些问题,例如:
- 将所有结果累积到列表中,而不是在它们可用时处理它们
- 在提交最后一个作业后立即关闭结果通道,而不是等待所有结果
我编写了自己的代码,并添加了用于对比Stream API单行代码性能的基准测试代码。以下是代码:
const val NUM_JOBS = 1000
val jobs = (0 until NUM_JOBS).map { MyJob() }
fun parallelStream(): Double =
jobs.parallelStream().map { it.execute() }.collect(summingDouble { it })
fun channels(): Double {
val resultChannel = Channel<Double>(UNLIMITED)
val mainComputeChannel = Channel<MyJob>()
val poolComputeChannels = (1..commonPool().parallelism).map { _ ->
GlobalScope.actor<MyJob>(Dispatchers.Default) {
for (job in channel) {
job.execute().also { resultChannel.send(it) }
}
}
}
val allComputeChannels = poolComputeChannels + mainComputeChannel
GlobalScope.launch {
jobs.forEach { job ->
select {
allComputeChannels.forEach { chan ->
chan.onSend(job) {}
}
}
}
}
return runBlocking {
var completedCount = 0
var sum = 0.0
while (completedCount < NUM_JOBS) {
select<Unit> {
mainComputeChannel.onReceive { job ->
job.execute().also { resultChannel.send(it) }
}
resultChannel.onReceive { result ->
sum += result
completedCount++
}
}
}
sum
}
}
fun main(args: Array<String>) {
measure("Parallel Stream", ::parallelStream)
measure("Channels", ::channels)
measure("Parallel Stream", ::parallelStream)
measure("Channels", ::channels)
}
fun measure(task: String, measuredCode: () -> Double) {
val block = { print(measuredCode().toString().substringBefore('.')) }
println("Warming up $task")
(1..20).forEach { _ -> block() }
println("\nMeasuring $task")
val average = (1..20).map { measureTimeMillis(block) }.average()
println("\n$task took $average ms")
}
这是我的典型结果:
Parallel Stream took 396.85 ms
Channels took 398.1 ms
结果很相似,但一行代码仍然胜过50行代码 :)
-Djava.util.concurrent.ForkJoinPool.common.parallelism=8
将公共池大小增加到8,并且YourKit仍然(实际上 - 再次)显示主线程大部分为橙色条,与任务管理器相同 - CPU利用率在90-95%水平。我只是在讨论。 - Witold Kupśclass MyJob(val index: Int) { fun execute(): Double { val rnd = ThreadLocalRandom.current(); var d = 1.0; (1..rnd.nextInt(100_000_000)).forEach { d *= 1 + rnd.nextDouble(0.0000001) }; return d } }
因此,持续时间具有随机变化,均匀分布。我没有使用YourKit,而是使用VisualVM和MacOS本地任务管理器。 - Marko Topolnik