Kotlin协程 vs CompletableFuture

6

能有人解释一下为什么人们应该使用协程吗?是否有一些协程代码示例可以显示出与常规java并发代码相比更好的完成时间(没有神奇的delay()函数,没人在生产中使用delay())?

以我的个人例子来看,协程(第1行)对于java代码(第2行)来说不怎么样。也许我做错了什么?

示例:

import kotlinx.coroutines.*
import java.time.Instant
import java.util.concurrent.CompletableFuture
import java.util.concurrent.Future

@ExperimentalCoroutinesApi
fun main() = runBlocking {
    val begin = Instant.now().toEpochMilli()
    val jobs = List(150_000) {
        GlobalScope.launch { print(getText().await()) } // :1
//        CompletableFuture.supplyAsync { "." }.thenAccept { print(it) } // :2
    }
    jobs.forEach { it.join() }
    println(Instant.now().toEpochMilli() - begin)
}

fun getText(): Future<String> {
    return CompletableFuture.supplyAsync {
        "."
    }
}

@ExperimentalCoroutinesApi
suspend fun <T> Future<T>.await(): T = suspendCancellableCoroutine { cont ->
    cont.resume(this.get()) {
        this.cancel(true)
    }
}

补充问题:

为什么我需要创建这个协程包装器await()?否则,如果使用get()方法,它会产生inappropriate blocking method call的警告,但似乎并没有提高协程版本的代码效率?


5
delay()实际上是生产代码的一个很好的近似,因为协程在等待响应返回时会被暂停。这里有一个不错的总结:使用线程来并行处理工作,使用协程来并行等待。 - Marko Topolnik
4个回答

12
协程的目标不是“更好的完成时间”。它的目标——它实际上做得很好——是协程更易于使用。
话虽如此,你在代码中所做的绝对不是比较两种替代方法速度的好方法。在Java中比较事物的速度并获得真实结果非常困难,你应该阅读至少如何在Java中编写正确的微基准?才能尝试。你当前尝试比较两个Java代码片段的方式将会对你的代码的真实性能行为进行欺骗。
回答你的额外问题,答案是你不应该创建那个await方法。无论是在suspendCancellableCoroutine还是其他情况下,你都不应该使用get()或java.util.concurrent.Future与协程代码一起使用。如果你想使用CompletableFuture,请使用提供的库来与协程代码交互。

谢谢您的答复。我已经在我的示例中添加了这个库并得到了更好的结果,但仍然比Java差。现在我已经明白了包装器的目的。我的包装器并不真正可暂停。 - lalilulelo_1986

3
这是我用于基准测试的代码清理版本。请注意,我从测量的代码中删除了print,因为打印本身是一个重量级操作,涉及到互斥、JNI、阻塞输出流等。相反,我更新了一个易失性变量。
import kotlinx.coroutines.ExperimentalCoroutinesApi
import kotlinx.coroutines.GlobalScope
import kotlinx.coroutines.future.await
import kotlinx.coroutines.launch
import kotlinx.coroutines.runBlocking
import java.lang.Thread.sleep
import java.util.concurrent.CompletableFuture
import java.util.concurrent.CompletionStage
import java.util.concurrent.TimeUnit.NANOSECONDS

@Volatile
var total = 0

@ExperimentalCoroutinesApi
fun main() = runBlocking {
    println("Warmup")
    measure(20_000)
    println("Measure")
    val begin = System.nanoTime()
    measure(40_000)
    println("Completed in ${NANOSECONDS.toMillis(System.nanoTime() - begin)} ms")
}

fun getText(): CompletableFuture<Int> {
    return CompletableFuture.supplyAsync {
        sleep(1)
        1
    }
}

suspend fun measure(count: Int) {
    val jobs = List(count) {
        GlobalScope.launch { total += getText().await() } // :1
//        getText().thenAccept { total += it } // :2
    }
    jobs.forEach { it.join() }
}

对于第一个案例,我的结果是6.5秒,而第二个案例需要7秒。这是一个7%的差异,可能只适用于此特定情况,而不是两种方法之间通常看到的差异。

选择协程而不是基于CompletionStage的编程的原因绝对不是关于那7%的东西,而是方便上的巨大差异。为了理解我的意思,我邀请您通过仅调用computeAsync而不使用future.await()来重写main函数:

suspend fun main() {
    try {
        if (compute(1) == 2) {
            println(compute(4))
        } else {
            println(compute(7))
        }
    } catch (e: RuntimeException) {
        println("Got an error")
        println(compute(8))
    }
}

fun main_no_coroutines() {
    // Let's see how it might look!
}

fun computeAsync(input: Int): CompletableFuture<Int> {
    return CompletableFuture.supplyAsync {
        sleep(1)
        if (input == 7) {
            throw RuntimeException("Input was 7")
        }
        input % 3
    }
}

suspend fun compute(input: Int) = computeAsync(input).await()

关于你的第二段代码,我不太明白。它没有并行性:运行需要>= 3毫秒(为了更好地可见性,可以将其改为sleep(1000),然后需要3秒)。我使用main_no_coroutines()通过复制main()并将compute()替换为computeAsync().get(),将RuntimeException替换为Exception(因为可能是ExecutionException)得到了相同的结果。main_no_coroutines的可读性和易用性大致相同,所以我看不出使用协程版本的优势。 - undefined
第二个代码段确实没有并行性,它是关于选择非阻塞代码的编程范式。我展示了一个看起来简单的顺序非阻塞代码,并挑战读者使用异步编程风格来重写它,以保持非阻塞的特性。Future.get()是一个阻塞函数。 - undefined

2
切换到 kotlinx-coroutines-jdk8 库并将 sleep(1) 添加到我的 getText() 函数后。
import kotlinx.coroutines.GlobalScope
import kotlinx.coroutines.future.await
import kotlinx.coroutines.launch
import kotlinx.coroutines.runBlocking
import java.lang.Thread.sleep
import java.time.Instant
import java.util.concurrent.CompletableFuture

fun main() = runBlocking {
    val begin = Instant.now().toEpochMilli()
    val jobs = List(150_000) {
        GlobalScope.launch { print(getText().await()) } // :1
//        getText().thenAccept { print(it) } // :2
    }
    jobs.forEach { it.join() }
    println(Instant.now().toEpochMilli() - begin)
}

fun getText(): CompletableFuture<String> {
    return CompletableFuture.supplyAsync {
        sleep(1)
        "."
    }
}

我制作的协程版本比Java版本更快!!!显然,当存在一些延迟时,这个额外的协程层就变得有意义了。


如果你想像sleep一样阻塞线程,那么不要考虑协程。它们在Future.get()上并没有太多价值。 - Marko Topolnik
但在我的例子中,使用协程比 CompletableFuture 获得了更好的结果。似乎 sleep 在协程中运行良好。看起来 Java 不会阻塞它。kotlinx-coroutines-jdk8 库 有类似的例子。 - lalilulelo_1986
我真的很想知道你到底想要实现什么。我还想知道你正在使用什么代码,能够给出更好的结果。从我的经验来看,sleep(1) 是主要的瓶颈,其余的并不重要。此外,CompletableFuture.await() 只是一个对基础Java机制的薄包装,因此它几乎不可能比它更快。 - Marko Topolnik
只是想弄清楚这个协程是如何工作的。我添加了正在测试的代码。 - lalilulelo_1986
我正在测试第一行和第二行。 - lalilulelo_1986
但是你也做了一些更改,所以我不知道你到底在测量什么。此外,你从每个作业中打印,但打印本身是一个重量级操作,会抵消结果。我基于你的代码进行了基准测试,并得到了非常相似的结果,相差不超过7%。 - Marko Topolnik

0

我的两个版本的compute方法,没有重写方法签名。我想我明白了你的意思。使用协程,我们可以以熟悉的顺序方式编写复杂的并行代码。但是,由于挂起技术,协程await包装器不能使此工作正常运行,它只是实现了与我相同的逻辑。

import java.lang.Thread.sleep
import java.util.concurrent.CompletableFuture

fun main() {
    try {
        if (compute(1) == 2) {
            println(compute(4))
        } else {
            println(compute(7))
        }
    } catch (e: RuntimeException) {
        println("Got an error")
        println(compute(8))
    }
}

fun compute(input: Int): Int {
    var exception: Throwable? = null
    val supplyAsync = CompletableFuture.supplyAsync {
        sleep(1)
        if (input == 7) {
            throw RuntimeException("Input was 7")
        }
        input % 3
    }.exceptionally {
        exception = it
        throw it
    }
    while (supplyAsync.isDone.not()) {}
    return if (supplyAsync.isCompletedExceptionally) {
        throw exception!!
    } else supplyAsync.get()
}

fun compute2(input: Int): Int {
    try {
        return CompletableFuture.supplyAsync {
            sleep(1)
            if (input == 7) {
                throw RuntimeException("Input was 7")
            }
            input % 3
        }.get()
    } catch (ex: Exception) {
        throw ex.cause!!
    }
}

这个想法是从 compute 中移除 .await(),使其返回一个 CompletableFuture,然后尝试重写调用方逻辑以达到与我的效果相同。这将把很多逻辑移到原本不存在的 compute() 中。 - Marko Topolnik
我不理解你的评论,认为是 .await() 并非使这个工作。实际上,就是该方法将你返回 CompletableFuture 的函数转换成了返回结果的函数,从而允许你使用普通的顺序惯用语法,包括一个大型的 try-catch 块,以捕获任何异步调用时的异常。 - Marko Topolnik
我想说的是,协程并不是为了更容易地处理异常而存在的。 - lalilulelo_1986
是的,我理解你的意思,但对我来说这并不合理,因为只有协程而没有其他东西使得它变得更加容易。 - Marko Topolnik

网页内容由stack overflow 提供, 点击上面的
可以查看英文原文,
原文链接