将回调流转换为共享流

4

我刚刚开始使用协程/流(以及Kotlin),尝试将callbackFlow转换为sharedFlow时遇到了困难。

我编写了下面的简单示例,只是为了展示我尝试过的内容,但并没有成功。我的代码更加复杂,但我认为这个例子反映了我想要实现的问题所在。

fun main() = runBlocking {

    getMySharedFlow().collect{
        println("collector 1 value: $it")
    }

    getMySharedFlow().collect{
        println("collector 2 value: $it")
    }

}

val sharedFlow = MutableSharedFlow<Int>()

suspend fun getMySharedFlow(): SharedFlow<Int> {
    println("inside sharedflow")
    getMyCallbackFlow().collect{
        println("emitting to sharedflow value: $it")
        sharedFlow.emit(it)
    }
    return sharedFlow
}

fun getMyCallbackFlow(): Flow<Int> = callbackFlow<Int> {
    println("inside callbackflow producer")
    fetchSomethingContinuously {
        println("fetched something")
        offer(1)
        offer(2)
        offer(3)
    }
    awaitClose()
}

fun fetchSomethingContinuously(myCallBack: ()->Unit) {
    println("fetching something...")
    myCallBack()
}

这个想法是,fetchSomethingContinuously 只被调用一次,无论共享流的收集器数量如何。但正如您从输出中看到的那样,收集器从未获得值。
inside sharedflow
inside callbackflow producer
fetching something...
fetched something
emitting to sharedflow value: 1
emitting to sharedflow value: 2
emitting to sharedflow value: 3

我看了一下shareIn操作符,但不确定如何确切使用它。

我应该如何实现类似的功能呢?非常感谢任何提示。

1个回答

6

你所忽略的是对 collect, emit(), 和 awaitClose() 的调用都是挂起操作,只有在对应的操作完成后才会结束。

getMySharedFlow() 函数甚至没有返回就已经应用了 collect 操作,因为它正在收集 callbackFlow。而 callbackFlow 在调用 awaitClose() 后就停滞了,而 awaitClose() 又无法结束,因为 fetchSomethingContinuously 没有使用 close() 函数结束回调函数。

在这里需要明确并行性,不要一味地使用挂起操作。以下是您示例代码的工作版本:

val sharedFlow = MutableSharedFlow<Int>()

suspend fun startSharedFlow() {
    println("Starting Shared Flow callback collection")

    getMyCallbackFlow().collect {
        println("emitting to sharedflow value: $it")
        sharedFlow.emit(it)
    }
}

fun main() = runBlocking<Unit> {

    launch {
        startSharedFlow()
    }

    launch {
        sharedFlow.collect {
            println("collector 1 value: $it")
        }
    }

    launch {
        sharedFlow.collect {
            println("collector 2 value: $it")
        }
    }

}


fun getMyCallbackFlow(): Flow<Int> = callbackFlow<Int> {
    println("inside callbackflow producer")
    fetchSomethingContinuously {
        println("fetched something")
        offer(1)
        offer(2)
        offer(3)
        //close() - call close here if you need to signal that this callback is done sending elements
    }
    awaitClose()
}

fun fetchSomethingContinuously(myCallBack: () -> Unit) {
    println("fetching something...")
    myCallBack()
}

launch 调用允许异步执行发射和收集值。

此外,关于 shareIn() 运算符,它只是从指定的上游创建一个 SharedFlow,就像你想要做的那样。另外,你可以使用 started 参数指定何时开始共享。更多信息请参见此处

这是在你的示例中如何使用它:

fun main() = runBlocking<Unit> {

    val sharedFlow = getMyCallbackFlow().shareIn(this, started = SharingStarted.Eagerly)

    launch {
        sharedFlow.collect {
            println("collector 1 value: $it")
        }
    }

    launch {
        sharedFlow.collect {
            println("collector 2 value: $it")
        }
    }

}


fun getMyCallbackFlow(): Flow<Int> = callbackFlow<Int> {
    println("inside callbackflow producer")
    fetchSomethingContinuously {
        println("fetched something")
        offer(1)
        offer(2)
        offer(3)
        //close() - call close here if you need to signal that this callback is done sending elements
    }
    awaitClose()
}

fun fetchSomethingContinuously(myCallBack: () -> Unit) {
    println("fetching something...")
    myCallBack()
}

1
非常感谢您提供的详细解释和示例。它们对我解决问题非常有帮助 :) - amp
1
不客气!学习 Kotlin,特别是像协程和流这样更复杂的主题,Respect++。它们的设计非常棒,对每个项目都有很大的帮助。 - Horațiu Udrea

网页内容由stack overflow 提供, 点击上面的
可以查看英文原文,
原文链接