如何从Kotlin回调中拆分“热”的事件流?

6
我正在处理通过回调到达的热事件流。在“下游”,我想将其拆分成多个流并进行处理。这些事件是按顺序从单个线程(我无法控制,因此不认为可以在此处使用协同程序)中依次到达的。
在这里使用什么结构是正确的?
我可以很容易地创建一个 Flow,使用 callbackFlow 和 sendBlocking,但语义似乎不匹配,因为 Flow 不是冷的。将 Flow 拆分为多个下游流(取决于事件内容)的最佳方法是什么?或者我应该使用 Channel?它与我的源的“热度”相匹配,但整个轮询下游似乎有些偏差(在这种基本上同步的情况下),并且许多方法似乎已被淘汰,而改用 Flow。
我可以通过只使用“全部回调”来完成所有这些工作,但这会创建比我希望的更紧密的耦合。有什么好的想法吗?
编辑:
最终我得到了这个,看起来可以工作:
    fun testFlow() {
        runBlocking {
            val original = flowOf("aap", "noot", "mies", "wim", "zus","jet","weide","does")
            val broadcast = original.broadcastIn(this)
            val flow1 = broadcast.openSubscription().receiveAsFlow().filter { it.length == 4 }
            val flow2 = broadcast.openSubscription().receiveAsFlow().filter { it.length == 3 }
            flow1.collect { it -> println("Four letter: ${it}") }
            flow2.collect { it -> println("Three letter: ${it}") }
        }
    }

1
使用receiveAsFlow()不会自动取消订阅通道,因此在这种情况下应该优先选择consumeAsFlow()。话虽如此,openSubscription().consumeAsFlow()会立即创建2个热流,而使用asFlow()实际上将“子流”保持为冷态,只有在收集时才会创建订阅。 - Joffrey
1个回答

16

简短回答

很快就会有一个热的SharedFlow用于此用例,但在此期间,您可以在幕后使用BroadcastChannel

您可以使用callbackFlow从基于回调的API创建冷流(请参见Roman Elizarov的post about it)。 然后使用以下内容使其变为热流并共享:

val original: Flow<String> = TODO("get original flow")

// create an implicit hot BroadcastChannel, shared between collectors
val sharedFlow = original.broadcastIn(scope).asFlow()

// create derived cold flows, which will subscribe (on collect) to the
// same hot source (BroadcastChannel)
val flow1 = sharedFlow.filter { it.length == 4 }
val flow2 = sharedFlow.filter { it.length == 3 }.map { it.toUppercase() }

flow1.collect { it -> println("Four letter: ${it}") }
flow2.collect { it -> println("Three letter: ${it}") }

让流变热(当前方法)

首先需要澄清的是,即使目前大多数Flow都是冷的,已经有了一个热的StateFlow,很快就会有一个方便的share运算符和一个热的SharedFlow来简化这种用例。

在等待时,如果您最初有一个冷的Flow,则当前必须首先创建一个热通道(和一个用于向其中发送元素的协程),然后从中派生共享热源的流。可以通过以下方式之一轻松完成此操作:

  • Flow.produceIn(scope)在给定作用域中启动协程并为您提供ReceiveChannel(对于扇出很有用,请参见下文)
  • Flow.broadcastIn(scope)在给定作用域中启动协程并为您提供BroadcastChannel(对于实际共享很有用,请参见下文)

一旦您拥有了一个热通道,就可以将其转换为流并获得不同的行为:

  • ReceiveChannel.consumeAsFlow()从热源创建一个Flow,但只能由单个收集器collect(否则会抛出异常)
  • ReceiveChannel.receiveAsFlow()创建一个多收集器Flow,但它以扇出方式运行(源通道的每个元素仅传递给一个使用者)
  • BroadcastChannel.asFlow()创建一个多收集器Flow,其中每个收集器都获取所有元素(实际上是共享)。调用collect会在BroadcastChannel上创建一个新的订阅,并正确处理取消。

带有StateFlow的“最新状态”语义

这不是你的使用场景,但有时您可能不需要流中所有值,而仅需最新的当前状态和状态更新。
以前可以通过 ConflatedBroadcastChannel 来实现,但现在可以使用StateFlow来表示这一点(自协程 1.3.6 开始):
  • 生产者一侧,设置一个 MutableStateFlow 的值。
  • 消费者一侧,每个收集器在开始时都会得到当前状态,然后每次状态值与上一个值不同时,都会获得一个新的状态值(基于equality)。

我拼凑了一个例子,我觉得没问题,谢谢你。 - Frank Lee
1
@FrankLee,我所描述的与你正在做的有些类似。我只是利用asFlow()来消除显式通道订阅,因为它会自动处理底层通道取消。请参见我基于你的片段提供的示例代码。 - Joffrey

网页内容由stack overflow 提供, 点击上面的
可以查看英文原文,
原文链接