Kotlin协程 - 在runBlocking中使用主线程

6

我正试图执行以下代码:

 val jobs = listOf(...)
 return runBlocking(CommonPool) {
    val executed = jobs.map {
        async { it.execute() }
    }.toTypedArray()
    awaitAll(*executed)
 }

其中jobs是一些Supplier的列表。在同步世界中,这应该只创建例如整数列表。

一切都运行良好,但问题是主线程没有得到利用。下面是来自YourKit的屏幕截图:

enter image description here

所以问题是 - 如何同时利用主线程?

我想runBlocking是这里的问题,但是否有其他方法可以获得相同的结果?使用Java并行流看起来更好,但主线程仍未完全利用(任务完全独立)。

更新

好吧,也许我告诉你的东西太少了。 我的问题是在观看Vankant Subramaniam演示之后产生的:https://youtu.be/0hQvWIdwnw4。 我需要最大的性能,没有IO,没有UI等。只有计算。只有一个请求,我需要使用所有可用资源。

我唯一拥有的一件事是将并行度设置为线程数+1,但我认为这相当愚蠢。

4个回答

2

我使用了Java 8并行流测试了该解决方案:

jobs.parallelStream().forEach { it.execute() }

我发现CPU利用率始终保持在100%。 作为参考,我使用了这个计算任务:

class MyJob {
    fun execute(): Double {
        val rnd = ThreadLocalRandom.current()
        var d = 1.0
        (1..rnd.nextInt(1_000_000)).forEach { _ ->
            d *= 1 + rnd.nextDouble(0.0000001)
        }
        return d
    }
}

请注意,其持续时间随机变化,从零到执行100,000,000次浮点数乘法所需的时间不等。
出于好奇心,我也研究了您在问题中添加的代码作为解决方案。我发现其中存在一些问题,例如:
- 将所有结果累积到列表中,而不是在它们可用时处理它们 - 在提交最后一个作业后立即关闭结果通道,而不是等待所有结果
我编写了自己的代码,并添加了用于对比Stream API单行代码性能的基准测试代码。以下是代码:
const val NUM_JOBS = 1000
val jobs = (0 until NUM_JOBS).map { MyJob() }


fun parallelStream(): Double =
        jobs.parallelStream().map { it.execute() }.collect(summingDouble { it })

fun channels(): Double {
    val resultChannel = Channel<Double>(UNLIMITED)

    val mainComputeChannel = Channel<MyJob>()
    val poolComputeChannels = (1..commonPool().parallelism).map { _ ->
        GlobalScope.actor<MyJob>(Dispatchers.Default) {
            for (job in channel) {
                job.execute().also { resultChannel.send(it) }
            }
        }
    }
    val allComputeChannels = poolComputeChannels + mainComputeChannel

    // Launch a coroutine that submits the jobs
    GlobalScope.launch {
        jobs.forEach { job ->
            select {
                allComputeChannels.forEach { chan ->
                    chan.onSend(job) {}
                }
            }
        }
    }

    // Run the main loop which takes turns between running a job
    // submitted to the main thread channel and receiving a result
    return runBlocking {
        var completedCount = 0
        var sum = 0.0
        while (completedCount < NUM_JOBS) {
            select<Unit> {
                mainComputeChannel.onReceive { job ->
                    job.execute().also { resultChannel.send(it) }
                }
                resultChannel.onReceive { result ->
                    sum += result
                    completedCount++
                }
            }
        }
        sum
    }
}

fun main(args: Array<String>) {
    measure("Parallel Stream", ::parallelStream)
    measure("Channels", ::channels)
    measure("Parallel Stream", ::parallelStream)
    measure("Channels", ::channels)
}

fun measure(task: String, measuredCode: () -> Double) {
    val block = { print(measuredCode().toString().substringBefore('.')) }
    println("Warming up $task")
    (1..20).forEach { _ -> block() }
    println("\nMeasuring $task")
    val average = (1..20).map { measureTimeMillis(block) }.average()
    println("\n$task took $average ms")
}

这是我的典型结果:
Parallel Stream took 396.85 ms
Channels took 398.1 ms

结果很相似,但一行代码仍然胜过50行代码 :)

现在想象一下,我有10,000个工作,并且它们的时间并不相等。我会使用执行器,但我正在寻找更高效的解决方案。为什么?https://blog.takipi.com/forkjoin-framework-vs-parallel-streams-vs-executorservice-the-ultimate-benchmark/ - Witold Kupś
1
我刚刚验证了利用率,使用单行代码就达到了100%。主线程的CPU时间与所有公共池线程完全相同。我知道你可能只是在寻找一些有趣的挑战,但就实际效率而言,你无法超越Java流。 - Marko Topolnik
好的。我已经通过-Djava.util.concurrent.ForkJoinPool.common.parallelism=8将公共池大小增加到8,并且YourKit仍然(实际上 - 再次)显示主线程大部分为橙色条,与任务管理器相同 - CPU利用率在90-95%水平。我只是在讨论。 - Witold Kupś
我没有使用那个,但仍然获得了100%的结果。我在MacOS上使用以下计算:class MyJob(val index: Int) { fun execute(): Double { val rnd = ThreadLocalRandom.current(); var d = 1.0; (1..rnd.nextInt(100_000_000)).forEach { d *= 1 + rnd.nextDouble(0.0000001) }; return d } } 因此,持续时间具有随机变化,均匀分布。我没有使用YourKit,而是使用VisualVM和MacOS本地任务管理器。 - Marko Topolnik
我将在聊天室中发布进一步的评论和发现。 - Marko Topolnik
显示剩余2条评论

1

即使在此显式线程上没有运行任何工作,也不意味着设备没有在同一核心上运行其他线程。

实际上,让您的MainThread空闲是更好的选择,这将使您的UI更加响应。


每个应用程序都有用户界面吗? - Witold Kupś

1
首先,我想强调利用主线程通常没有任何实际用途。
如果您的应用程序完全异步,则只会阻塞单个(主)线程。该线程确实消耗一些内存并增加了一些额外的调度压力,但对性能的影响增加是可以忽略的,甚至无法测量。
在实际的Java世界中,几乎不可能在JVM中有固定数量的线程。有系统线程(gc),有nio线程等等。
一个线程没有什么区别。只要您的应用程序中的线程数不随着负载增加而不受限制地增长,那么您就没问题了。

回到最初的问题。

我认为在这种并行处理任务中,没有一种简洁的方法可以利用主线程。

例如,您可以执行以下操作:

data class Job(val res: Int) {
    fun execute(): Int {
        Thread.sleep(100)
        println("execute $res in ${Thread.currentThread().name}")
        return res
    }
}

fun main() {
    val jobs = (1..100).map { Job(it) }
    val resultChannel = Channel<Int>(Channel.UNLIMITED)
    val mainInputChannel = Channel<Job>()

    val workers = (1..10).map {
        actor<Job>(CommonPool) {
            for (j in channel) {
                resultChannel.send(j.execute())
            }
        }
    }

    val res: Deferred<List<Int>> = async(CommonPool) {
        val allChannels = (listOf(mainInputChannel) + workers)

        jobs.forEach { job ->
            select {
                allChannels.forEach {
                    it.onSend(job) {}
                }
            }
        }

        allChannels.forEach { it.close() }
        (1..jobs.size).map { resultChannel.receive() }
    }

    runBlocking {
        for (j in mainInputChannel) {
            resultChannel.send(j.execute())
        }
    }

    runBlocking {
        res.await().forEach { println(it) }
    }
}

基本上,这是一个简单的生产者/消费者实现,其中主线程充当其中一个消费者。但是这会导致大量的样板文件。

输出:

execute 1 in main @coroutine#12
execute 5 in ForkJoinPool.commonPool-worker-1 @coroutine#4
execute 6 in ForkJoinPool.commonPool-worker-2 @coroutine#5
execute 7 in ForkJoinPool.commonPool-worker-7 @coroutine#6
execute 2 in ForkJoinPool.commonPool-worker-6 @coroutine#1
execute 8 in ForkJoinPool.commonPool-worker-4 @coroutine#7
execute 4 in ForkJoinPool.commonPool-worker-5 @coroutine#3
execute 3 in ForkJoinPool.commonPool-worker-3 @coroutine#2
execute 12 in main @coroutine#12
execute 10 in ForkJoinPool.commonPool-worker-7 @coroutine#9
execute 15 in ForkJoinPool.commonPool-worker-5 @coroutine#6
execute 11 in ForkJoinPool.commonPool-worker-3 @coroutine#10
execute 16 in ForkJoinPool.commonPool-worker-6 @coroutine#1
execute 9 in ForkJoinPool.commonPool-worker-1 @coroutine#8
execute 14 in ForkJoinPool.commonPool-worker-4 @coroutine#5
execute 13 in ForkJoinPool.commonPool-worker-2 @coroutine#4
execute 20 in main @coroutine#12
execute 17 in ForkJoinPool.commonPool-worker-5 @coroutine#2
execute 18 in ForkJoinPool.commonPool-worker-3 @coroutine#3
execute 24 in ForkJoinPool.commonPool-worker-1 @coroutine#6
execute 23 in ForkJoinPool.commonPool-worker-4 @coroutine#5
execute 22 in ForkJoinPool.commonPool-worker-2 @coroutine#4
execute 19 in ForkJoinPool.commonPool-worker-7 @coroutine#7
execute 21 in ForkJoinPool.commonPool-worker-6 @coroutine#1
execute 25 in ForkJoinPool.commonPool-worker-5 @coroutine#8
execute 28 in main @coroutine#12
execute 29 in ForkJoinPool.commonPool-worker-2 @coroutine#2
execute 30 in ForkJoinPool.commonPool-worker-7 @coroutine#3
execute 27 in ForkJoinPool.commonPool-worker-4 @coroutine#10
execute 26 in ForkJoinPool.commonPool-worker-1 @coroutine#9
execute 32 in ForkJoinPool.commonPool-worker-3 @coroutine#4
execute 31 in ForkJoinPool.commonPool-worker-6 @coroutine#1
execute 36 in ForkJoinPool.commonPool-worker-5 @coroutine#8
execute 35 in ForkJoinPool.commonPool-worker-4 @coroutine#7
execute 33 in ForkJoinPool.commonPool-worker-2 @coroutine#5
execute 38 in ForkJoinPool.commonPool-worker-3 @coroutine#2
execute 37 in main @coroutine#12
execute 34 in ForkJoinPool.commonPool-worker-7 @coroutine#6
execute 39 in ForkJoinPool.commonPool-worker-6 @coroutine#3
execute 40 in ForkJoinPool.commonPool-worker-1 @coroutine#1
execute 44 in ForkJoinPool.commonPool-worker-5 @coroutine#8
execute 41 in ForkJoinPool.commonPool-worker-4 @coroutine#4
execute 46 in ForkJoinPool.commonPool-worker-1 @coroutine#2
execute 47 in ForkJoinPool.commonPool-worker-6 @coroutine#1
execute 45 in main @coroutine#12
execute 42 in ForkJoinPool.commonPool-worker-2 @coroutine#9
execute 43 in ForkJoinPool.commonPool-worker-7 @coroutine#10
execute 48 in ForkJoinPool.commonPool-worker-3 @coroutine#3
execute 52 in ForkJoinPool.commonPool-worker-5 @coroutine#8
execute 49 in ForkJoinPool.commonPool-worker-1 @coroutine#5
execute 54 in ForkJoinPool.commonPool-worker-2 @coroutine#1
execute 53 in main @coroutine#12
execute 50 in ForkJoinPool.commonPool-worker-4 @coroutine#6
execute 51 in ForkJoinPool.commonPool-worker-6 @coroutine#7
execute 56 in ForkJoinPool.commonPool-worker-3 @coroutine#3
execute 55 in ForkJoinPool.commonPool-worker-7 @coroutine#2
execute 60 in ForkJoinPool.commonPool-worker-5 @coroutine#8
execute 61 in ForkJoinPool.commonPool-worker-1 @coroutine#5
execute 57 in ForkJoinPool.commonPool-worker-4 @coroutine#4
execute 59 in ForkJoinPool.commonPool-worker-3 @coroutine#10
execute 64 in ForkJoinPool.commonPool-worker-7 @coroutine#2
execute 58 in ForkJoinPool.commonPool-worker-6 @coroutine#9
execute 62 in ForkJoinPool.commonPool-worker-2 @coroutine#1
execute 63 in main @coroutine#12
execute 68 in ForkJoinPool.commonPool-worker-5 @coroutine#8
execute 65 in ForkJoinPool.commonPool-worker-1 @coroutine#3
execute 66 in ForkJoinPool.commonPool-worker-4 @coroutine#6
execute 67 in ForkJoinPool.commonPool-worker-7 @coroutine#7
execute 69 in ForkJoinPool.commonPool-worker-6 @coroutine#4
execute 70 in ForkJoinPool.commonPool-worker-3 @coroutine#2
execute 74 in ForkJoinPool.commonPool-worker-2 @coroutine#1
execute 75 in main @coroutine#12
execute 71 in ForkJoinPool.commonPool-worker-5 @coroutine#5
execute 76 in ForkJoinPool.commonPool-worker-7 @coroutine#3
execute 73 in ForkJoinPool.commonPool-worker-6 @coroutine#10
execute 78 in ForkJoinPool.commonPool-worker-4 @coroutine#6
execute 72 in ForkJoinPool.commonPool-worker-1 @coroutine#9
execute 77 in ForkJoinPool.commonPool-worker-3 @coroutine#8
execute 79 in ForkJoinPool.commonPool-worker-2 @coroutine#1
execute 83 in main @coroutine#12
execute 84 in ForkJoinPool.commonPool-worker-4 @coroutine#3
execute 85 in ForkJoinPool.commonPool-worker-5 @coroutine#5
execute 82 in ForkJoinPool.commonPool-worker-1 @coroutine#7
execute 81 in ForkJoinPool.commonPool-worker-6 @coroutine#4
execute 80 in ForkJoinPool.commonPool-worker-7 @coroutine#2
execute 89 in ForkJoinPool.commonPool-worker-3 @coroutine#8
execute 90 in ForkJoinPool.commonPool-worker-2 @coroutine#1
execute 91 in main @coroutine#12
execute 86 in ForkJoinPool.commonPool-worker-5 @coroutine#6
execute 88 in ForkJoinPool.commonPool-worker-6 @coroutine#10
execute 87 in ForkJoinPool.commonPool-worker-1 @coroutine#9
execute 92 in ForkJoinPool.commonPool-worker-7 @coroutine#2
execute 93 in ForkJoinPool.commonPool-worker-4 @coroutine#3
execute 99 in main @coroutine#12
execute 97 in ForkJoinPool.commonPool-worker-3 @coroutine#8
execute 98 in ForkJoinPool.commonPool-worker-2 @coroutine#1
execute 95 in ForkJoinPool.commonPool-worker-1 @coroutine#5
execute 100 in ForkJoinPool.commonPool-worker-4 @coroutine#6
execute 94 in ForkJoinPool.commonPool-worker-5 @coroutine#4
execute 96 in ForkJoinPool.commonPool-worker-7 @coroutine#7
1
5
6
7
2
8
4
3
12
10
15
11
16
9
14
13
20
17
18
24
23
22
19
21
25
28
29
30
27
26
32
31
36
35
33
38
37
34
39
40
44
41
46
47
45
42
43
48
52
49
54
53
50
51
56
55
60
61
57
59
64
58
62
63
68
65
66
67
69
70
74
75
71
76
73
78
72
77
79
83
84
85
82
81
80
89
90
91
86
88
87
92
93
99
97
98
95
100
94
96

1
考虑使用通道,但由于提到的样板代码和复杂性很高,我想避免使用它。尽管如此,我会尝试并提供反馈,谢谢。 - Witold Kupś
@WitoldKupś,我刚刚发现我的代码中存在可怕的竞态条件。对此我感到抱歉。请查看更新版本,特别是 (1..jobs.size).map { resultChannel.receive() } 部分。问题在于结果频道在所有工作完成之前被关闭了。另外,我真的鼓励你使用Java并行流与通道来检查应用程序的性能。我敢打赌你不会注意到差异,或者由于同步开销通道会更慢。通常简单的解决方案更好。祝你好运! - Aivean
他们和我最终使用了流,但在我看来,这仍是值得考虑的替代方案。 - Witold Kupś

0

使用DefaultDispatcher并从父进程获取池,不带任何参数的async(),因此所有异步调用都在CommonPool中执行。如果您想要不同的线程集来运行代码,请创建自己的池。 虽然通常最好不要使用主线程进行计算,但这取决于您的用例。


如果某些东西是必要的,那么这是一个好习惯 :) 公共池是一个 fork join 池,因此我认为主线程应该有可能在等待时参与其中一部分工作。最后,据我所知(请查看更新问题中附加的视频),parallel stream 有这样的可能性。 - Witold Kupś
默认的ForkJoinPool实现会创建工作线程,除了你的主线程之外,并且不会将你的主线程作为其中一个工作线程。因此,你可以创建自己的CoroutineDispatcher,并在其中实现负载均衡。你可以使用并行流或任何你想要的方式来消耗其中的工作并在不同的地方重复使用它。我认为这个功能目前还不是标准库的一部分。 - DmitryBorodin

网页内容由stack overflow 提供, 点击上面的
可以查看英文原文,
原文链接