使用Kotlin Flow异步发出流值

3
我正在使用Kotlin和Webflux构建一个简单的Spring服务。
我有一个返回Flow的端点。该Flow包含需要长时间计算的元素,这是通过delay模拟的。
它的构造方式如下:
suspend fun latest(): Flow<Message> {
    println("generating messages")


    return flow {
        for (i in 0..20) {
            println("generating $i")
            if (i % 2 == 0) delay(1000)
            else delay(200)
            println("generated messsage $i")
            emit(generateMessage(i))
        }
        println("messages generated")
    }
}

我原本期望的是它会按照Message1,然后是Message3、Message5…最后是Message0的顺序返回,因为每个生成器需要不同的延迟时间。

但实际上,流程以顺序包含了这些元素。

我想我在协程和流方面错过了一些重要的东西,我尝试使用不同的协程来实现我的想法,但我无法理解如何做到。

解决方案

正如Marko Topolnik和William Reed所指出的,使用channelFlow可以起到作用。

fun latest(): Flow<Message> {
    println("generating numbers")

    return channelFlow {
        for (i in 0..20) {
            launch {
                send(generateMessage(i))
            }
        }
    }
}

suspend fun generateMessage(i: Int): Message {
    println("generating $i")
    val time = measureTimeMillis {
        if (i % 2 == 0) delay(1000)
        else delay(500)
    }

    println("generated messsage $i in ${time}ms")
    return Message(UUID.randomUUID(), "This is Message $i")
}

当运行时,结果与预期相符。
generating numbers
generating 2
generating 0
generating 1
generating 6
...
generated messsage 5 in 501ms
generated messsage 9 in 501ms
generated messsage 13 in 501ms
generated messsage 15 in 505ms
generated messsage 4 in 1004ms
...

1
你不需要使用 Dispatchers.IO,只需编写 launch { ... } - Marko Topolnik
2
latest() 不需要成为一个挂起函数。 - Tenfour04
2个回答

1
一旦您开始使用每个元素的计算并发,您的第一个问题将是弄清楚何时完成所有计算。您需要提前知道要期望多少项。因此,我认为构建一个普通的List<Deferred<Message>>,然后在返回整个内容之前等待所有延迟是很自然的。在您的情况下,您没有从流中获得任何收益,因为流都是关于在流集合内同步执行任务的。您还可以使用channelFlow结合已知的消息计数,然后基于此终止流。优点是Spring可以更早地开始收集流。

编辑

实际上,计数问题并不存在:流程会自动等待您启动的所有子协程完成。


我会尝试使用channelFlow并发布我的结果。 - Linde_98

0

您目前的方法在整个函数中使用单个协程,包括for循环。这意味着任何调用suspend fun(例如delay)都会阻塞整个协程,直到它完成。它确实释放了线程去做其他事情,但当前协程被阻塞。

根据您提供的简化示例,很难说什么是正确的解决方案。如果您真的想要为每个for循环启动一个新的协程,那么可以在那里启动它,但似乎并不清楚这是否是正确的解决方案。


好的,但是我该怎么做呢?因为当我像这样做时: return flow { withContext(Dispatchers.Default){ launch { for (i in 0..20) { emit(generateMessage(i)) } println("numbers generated") } } } }我会得到一个异常:Exception in thread "main" java.lang.IllegalStateException: Flow invariant is violated: - Linde_98
1
我认为流程限制在于您需要在调用函数的同一协程上发出(类似于这样;异常会说明原因)。您将需要使用类似 channelFlow 的东西。再次强调,如果没有更多信息,我们无法提供更多帮助,因此很可能这不是正确的解决方案。 - William Reed
你需要什么更多的信息? - Linde_98

网页内容由stack overflow 提供, 点击上面的
可以查看英文原文,
原文链接