创建Kotlin协程Deferred对象以发出监听器回调结果。

5

我正在一个项目中从 RxJava 切换到 Kotlin 协程,用协程的替代方式替换所有的 Single 和 Observable 返回类型。我仍在努力理解以下构造:一个接口(例如 repository 的接口)提供数据查询访问,并返回一个 RxJava Single。实现使用 Single.create 创建一个 Single 对象,并使用 onSuccess/onError 发出结果。现在,实现需要做的是创建一个带有回调的监听器,并将该监听器注册到某些内容上。该创建的监听器的回调将调用自制 Single 的 onSuccess/onError。例如,使用 firebase(尽管我的问题不限于 firebase):

interface Repository {
    fun getData(query: Query): Single<DataSnapshot?>
}

fun getData(query: Query): Single<DataSnapshot?> = Single.create { emitter ->
    query.addListenerForSingleValueEvent(object : ValueEventListener {
        override fun onCancelled(error: DatabaseError?) {
            emitter.onError(Exception())
        }

        override fun onDataChange(data: DataSnapshot?) {
            emitter.onSuccess(data)
        }
    })
}

现在我希望的是接口方法返回coroutine Deferred。该如何创建实现以便还可以注册带有回调的监听器,其结果将由Deferred交付? 我没有看到使用这些协程构建程序(如async、launch等)完成onSuccess / onError所需的方式。

interface Repository {
    fun getData(query: Query): Deferred<DataSnapshot?>
}

1
如果你正在从 Rx 转向协程,那么你应该从 Single 转向挂起函数:suspend fun getData(query: Query): DataSnapshot?作为一个经验法则,你不应该有返回 Deferred 的函数。 - Roman Elizarov
好的,我会这样做,但我仍然不知道如何实现,我如何从监听器和回调函数中获得所需的挂起函数返回值DataSnapshot呢? - Lemao1981
2个回答

11

我的建议如下:

interface Repository {
    suspend fun getData(query: Query): Result<DataSnapshot>
}

Result可以是一个封装了成功和错误情况的密封类:

sealed class Result<T> {
    class Success<T>(result: T) : Result<T>()
    class Error<T>(error: String) : Result<T>()
}

这样,在getData的实现方面,您可以执行以下操作:
return Success(yourData)

或者

return Error("Something went wrong")

通常在处理协程时,应避免返回deferreds并尝试将它们用作“同步方法”。

编辑: 现在我理解了问题,希望这可以解决它:

//This is as generic as it gets, you could use it on any Query, no need to retype it
suspend fun Query.await(): DataSnapshot = suspendCoroutine{cont ->
    addListenerForSingleValueEvent(object : ValueEventListener{
        override fun onCancelled(error: DatabaseError?) {
            cont.resumeWithException(error?: Exception("Unknown Error"))
        }

        override fun onDataChange(data: DataSnapshot?) {
            if(data != null){
                cont.resume(data)
            } else {
                cont.resumeWithException(Exception("Null data"))
            }

        }
    })
}
//this is your actual implementation
suspend fun getData(query: Query):DataSnapshot =
        query.await()

这段代码假设DatabaseError扩展自Exception或Throwable。如果不是这样,您需要为它创建一个包装类型,或者使用我的原始解决方案,并在两种情况下都使用常规的resume。


仍未解决问题。我无法从回调方法中返回任何东西,这些方法旨在作为挂起函数的返回类型。这就是为什么首先要以所述方式完成Single.create构造的原因。 - Lemao1981
现在我更好地理解了这个问题。不幸的是,我现在没有时间解释如何从这种情况转换为协程。但是,在2017年kotlin conf中有两个关于协程的演讲,深入解释了它们,第二个演讲解释了如何做这种事情。 https://youtu.be/_hfBv0a09Jc https://youtu.be/YrrUCSi72E8 - Eric Martori

0

我认为最接近我所需的东西应该是ReceiveChannel。 我想出了这个解决方案:

override fun getData(query: Query): ReceiveChannel<Datasnapshot?> = GlobalScope.produce {
    query.addListenerForSingleValueEvent(object : ValueEventListener {
        override fun onDataChange(data: DataSnapshot?) {
            launch { send(data) } }
        }

        override fun onCancelled(error: DatabaseError?) {
            throw Exception()
        }
    })
}

不确定这是否有点过度,可能会有更好的选择,欢迎提供建议


网页内容由stack overflow 提供, 点击上面的
可以查看英文原文,
原文链接