推荐的延迟Kotlin的buildSequence的方法是什么?

5
我正在尝试轮询分页API,并在新项目出现时向用户提供这些项目。
fun connect(): Sequence<T> = buildSequence {
    while (true) {
        // result is a List<T>
        val result = dataSource.getFirstPage()
        yieldAll(/* the new data in `result` */)

        // Block the thread for a little bit
    }
}

以下是示例用法:
for (item in connect()) {
    // do something as each item is made available
}

我的第一个想法是使用delay函数,但是我收到了这个消息:

受限制的挂起函数只能在其受限制的协程范围内调用成员或扩展的挂起函数。

这是buildSequence的签名:

public fun <T> buildSequence(builderAction: suspend SequenceBuilder<T>.() -> Unit): Sequence<T>

我认为这条消息的意思是我只能在SequenceBuilder中使用挂起函数:yieldyieldAll,而任意调用挂起函数是不允许的。
现在,每当API被轮询一次后,我都使用以下方法阻塞序列构建一秒钟:
val resumeTime = System.nanoTime() + TimeUnit.SECONDS.toNanos(1)
while (resumeTime > System.nanoTime()) {
    // do nothing
}

这个方法能够运行,但似乎并不是个好的解决方案。有人之前遇到过这个问题吗?


1
你当前的hackaround实际上在运行序列时会占用整个CPU线程的100%...天哪。在此期间,请使用计划执行器+并发队列。 - F. George
1
请注意,buildSequence 只能同步延迟执行:由于协程是由迭代器运行的,它总是期望一个适当的序列元素来挂起并将其作为迭代器的下一个项返回。这就是为什么它会 RestrictsSuspension 的原因。另一个问题是,您使用主动等待而不是阻塞线程。 - hotkey
2个回答

12

为什么它不能工作?一些研究

当我们查看buildSequence时,可以看到它将builderAction: suspend SequenceBuilder<T>.() -> Unit作为其参数。作为该方法的客户端,您将能够传递一个具有SequenceBuilder作为其接收器的suspend lambda(阅读有关带接收器的lambda 这里)。
SequenceBuilder本身带有RestrictSuspension注释:

@RestrictsSuspension
@SinceKotlin("1.1")
public abstract class SequenceBuilder<in T> ...

注释的定义和说明如下:
/**
 * Classes and interfaces marked with this annotation are restricted
 * when used as receivers for extension `suspend` functions. 
 * These `suspend` extensions can only invoke other member or extension     
 * `suspend` functions on this particular receiver only 
 * and are restricted from calling arbitrary suspension functions.
 */
@SinceKotlin("1.1") @Target(AnnotationTarget.CLASS) @Retention(AnnotationRetention.BINARY)
public annotation class RestrictsSuspension

正如RestrictSuspension文档所述,在buildSequence的情况下,您可以传递一个带有SequenceBuilder作为其接收者的lambda,但是具有受限制的可能性,因为您只能在这个特定接收者上调用“其他成员或扩展suspend函数”。这意味着传递给buildSequence的块可以调用在SequenceBuilder上定义的任何方法(如yieldyieldAll)。另一方面,由于该块“无法调用任意挂起函数”,因此使用delay无效。生成的编译器错误进行了验证:

受限挂起函数只能在其受限协程范围内调用成员或扩展挂起函数。

最终,您需要知道buildSequence创建的协程是同步协程的示例。在您的示例中,序列代码将在调用connect()消耗序列的同一线程中执行。

如何延迟序列?

正如我们所学,buildSequence创建一个同步序列。在这里使用常规线程阻塞是可以的:

fun connect(): Sequence<T> = buildSequence {
    while (true) {
        val result = dataSource.getFirstPage()
        yieldAll(result)
        Thread.sleep(1000)
    }
}

但是,你真的想要整个线程被阻塞吗?或者,你可以按照这里所述实现异步序列。因此,使用delay和其他暂停函数将是有效的。


谢谢,这很有道理。我没有意识到我正在创建一个同步协程。 - mattbdean
似乎你可以尝试写成:suspend { delay(DELAY) },这样它就能工作了(至少可以编译通过)。 - Anton Shkurenko
它只是创建并丢弃一个 lambda,即无操作。 - Anton3

2

仅提供另一种解决方案...

如果你真正想要做的是异步生成元素,你可以使用Flows,它们基本上是异步序列。

这里是一个快速表格:

同步 异步
单个 普通值

fun example(): String
挂起

suspend fun example(): String

fun example(): Deferred<String>
多个 序列

fun example(): Sequence<String>


fun example(): Flow<String>

你可以通过将sequence { ... }构建器替换为flow { ... }构建器,然后用emit/emitAll替换yield/yieldAll,将你的Sequence<T>转换为Flow<T>

fun example(): Flow<String> = flow {
    (1..5).forEach { getString().let { emit(it) } }
}

suspend fun getString(): String = { ... }

因此,以您的例子为例:
fun connect(): Flow<T> = flow {
    while (true) {

        // Call suspend function to get data from dataSource
        val result: List<T> = dataSource.getFirstPage()
        emitAll(result)

        // _Suspend_ for a little bit
        delay(1000)
    }
}

网页内容由stack overflow 提供, 点击上面的
可以查看英文原文,
原文链接