使用Kotlin协程Flow与Room和状态处理

5

我正在尝试使用新的协程流,我的目标是创建一个简单的存储库,可以从 Web API 获取数据并将其保存到数据库中,同时从数据库返回一个流。

我正在使用 Room 和 Firebase 作为 Web API,现在一切似乎都很简单,直到我尝试将来自 API 的错误传递给 UI。

由于我从数据库获取的流仅包含数据而没有状态,因此,将其与 Web API 结果组合以赋予其状态(例如加载,内容,错误)的正确方法是什么?

我编写的一些代码:

DAO:

@Query("SELECT * FROM users")
fun getUsers(): Flow<List<UserPojo>>

代码仓库:

val users: Flow<List<UserPojo>> = userDao.getUsers()

API调用:

override fun downloadUsers(filters: UserListFilters, onResult: (result: FailableWrapper<MutableList<UserApiPojo>>) -> Unit) {
    val data = Gson().toJson(filters)

    functions.getHttpsCallable("users").call(data).addOnSuccessListener {
        try {
            val type = object : TypeToken<List<UserApiPojo>>() {}.type
            val users = Gson().fromJson<List<UserApiPojo>>(it.data.toString(), type)
            onResult.invoke(FailableWrapper(users.toMutableList(), null))
        } catch (e: java.lang.Exception) {
            onResult.invoke(FailableWrapper(null, "Error parsing data"))
        }
    }.addOnFailureListener {
        onResult(FailableWrapper(null, it.localizedMessage))
    }
}

我希望问题已经足够清晰明了,非常感谢你的帮助。
编辑:由于问题不是很清楚,我将尝试澄清。我的问题是,默认情况下,Room发出的数据流只包含数据,因此如果我订阅该数据流,我只会收到数据(例如,在本例中,我只会收到用户列表)。我的需要是实现一种通知应用状态(例如加载或错误)的方法。目前,我唯一能想到的方法是使用包含状态的“响应”对象,但似乎无法找到实现它的方法。
类似于:
fun getUsers(): Flow<Lce<List<UserPojo>>>{
    emit(Loading())
    downloadFromApi()
    if(downloadSuccessful)
        return flowFromDatabase
    else
        emit(Error(throwable))
}

但是我遇到的明显问题是,来自数据库的流的类型为Flow<List<UserPojo>>,我不知道如何通过状态编辑流"丰富它",而不失去对数据库的订阅,并且在每次更新数据库时不需要运行新的网络调用(通过进行map转换)。
希望现在更清楚了。
2个回答

8

我认为这更像是一个架构问题,但让我先尝试回答一些你的问题。

我的问题是,使用 Room 默认的流程只会返回数据,如果我订阅这个流程,我只会收到数据。

如果 Room 返回的 Flow 存在错误,您可以通过 catch() 处理它。

我需要实现的是一种通知应用程序状态(如加载或错误)的方法。

我同意您拥有 State 对象是一个好的方法。在我看来,将 State 对象呈现给 ViewViewModel 的责任。这个 State 对象应该有一种方式来暴露错误。

目前我唯一能想到的方法是一个包含状态的“响应”对象,但我似乎找不到实现它的方法。

我发现将由ViewModel控制的State对象负责错误处理比起从Service层冒泡上来的对象更容易实现。
既然这些问题都解决了,让我试着提出一种特定的“解决方案”。
正如你所提到的,通常会有一个Repository来处理从多个数据源检索数据。在这种情况下,Repository将获取DAO和代表从网络获取数据的对象,我们称之为Api。我假设您正在使用FirebaseFirestore,因此类和方法签名看起来应该像这样:
class Api(private val firestore: FirebaseFirestore) {

fun getUsers() : Flow<List<UserApiPojo>

}

现在问题变成了如何将基于回调的API转换为Flow。幸运的是,我们可以使用callbackFlow()来实现这一点。然后Api变成:
class Api(private val firestore: FirebaseFirestore) {

fun getUsers() : Flow<List<UserApiPojo> = callbackFlow {

val data = Gson().toJson(filters)

functions.getHttpsCallable("users").call(data).addOnSuccessListener {
    try {
        val type = object : TypeToken<List<UserApiPojo>>() {}.type
        val users = Gson().fromJson<List<UserApiPojo>>(it.data.toString(), type)
        offer(users.toMutableList())
    } catch (e: java.lang.Exception) {
       cancel(CancellationException("API Error", e))
    }
}.addOnFailureListener {
    cancel(CancellationException("Failure", e))
    }
  }
}

如您所见,callbackFlow 允许我们在出现问题时取消流,并由下游处理错误。
转到 Repository,我们现在想做类似于: val users: Flow<List<User>> = Flow.concat(userDao.getUsers().toUsers(), api.getUsers().toUsers()).first() 这里有一些注意事项。first()concat()是你需要自己想出的运算符。我没有看到一个返回Flow的版本first();它是一个终端操作符(Rx以前有一个返回Observable的版本first(),Dan Lew在this中使用了它)。Flow.concat()似乎也不存在。 users的目标是返回一个Flow,该流会发出源Flows中任何一个发出的第一个值。此外,请注意,我正在将DAO用户和Api用户映射到一个公共的User对象中。
现在我们可以谈论ViewModel了。正如我之前所说,ViewModel应该有一个持有State的东西。这个State应该代表数据、错误和加载状态。可以通过数据类来实现这一点。

数据类State(val users: List<User>, val loading: Boolean, val serverError:Boolean)

由于我们可以访问Repository,因此ViewModel可能如下所示:

val state = repo.users.map {users -> State(users, false, false)}.catch {emit(State(emptyList(), false, true)}

请记住这只是一个大致的解释,指引您朝一个方向前进,管理状态有很多方法,这并不是一个完整的实现。例如,将API调用转换为Flow可能并没有意义。


1
我相信你可以使用 Flow.first() = take(1),而对于连接操作,Flow.concat(other) = flowOf(this, other).flattenConcat() - Marko Topolnik
你好@Emmanuel,感谢你的回答。正如你所说,这实际上是一个架构问题。你的答案解决了我的一些问题,但我需要一些澄清。由于评论部分不允许格式化评论,为了使其更易读,我将在页面上添加一个答案。谢谢。 - alessandro gaboardi

0

Emmanuel的答案非常接近我需要的答案,我需要关于其中某些内容的澄清。

将API调用转换为Flow可能甚至没有意义

你说得完全正确,事实上我只想把它变成一个协程,我并不需要它成为一个Flow。

如果Room返回的Flow出现错误,可以通过catch()处理它

是的,发布问题后我发现了这一点。但我的问题更像是:

我想调用一个名为"getData"的方法,该方法应该返回来自数据库的Flow,开始网络调用以更新数据库(这样我将通过数据库的Flow被通知更新完成),并且在这里的某个地方,我需要让UI知道如果数据库或网络出错了,对吧?还是我应该分别执行"getDbFlow"和"updateData",并分别获取每个方法的错误呢?

val users: Flow<User> = Flow.concat(userDao.getUsers().toUsers(), api.getUsers().toUsers()).first()

这是一个好主意,但我想保持数据库作为唯一的真相来源,并且从网络中不直接返回任何数据给用户界面。


抱歉,我现在无法发布长篇回答,但如果您还没有看过我分享的Dan Lew链接,我建议您去看一下。这个想法是,在第一次网络调用时将数据插入到数据库中。然后,由于数据库已经有了数据,它不会再去网络上获取数据,而是直接从数据库中返回值。 - Emmanuel

网页内容由stack overflow 提供, 点击上面的
可以查看英文原文,
原文链接