使用Kotlin协程的NetworkBoundResource

19
你有没有想过如何使用NetworkBoundResource和 Kotlin 协程来实现存储库模式?我知道我们可以在 GlobalScope 中启动协程,但这可能会导致协程泄漏。我希望将 viewModelScope 作为参数传递,但在实现时有些棘手(因为我的存储库不知道任何 ViewModel 的 CoroutineScope)。
abstract class NetworkBoundResource<ResultType, RequestType>
@MainThread constructor(
    private val coroutineScope: CoroutineScope
) {

    private val result = MediatorLiveData<Resource<ResultType>>()

    init {
        result.value = Resource.loading(null)
        @Suppress("LeakingThis")
        val dbSource = loadFromDb()
        result.addSource(dbSource) { data ->
            result.removeSource(dbSource)
            if (shouldFetch(data)) {
                fetchFromNetwork(dbSource)
            } else {
                result.addSource(dbSource) { newData ->
                    setValue(Resource.success(newData))
                }
            }
        }
    }

    @MainThread
    private fun setValue(newValue: Resource<ResultType>) {
        if (result.value != newValue) {
            result.value = newValue
        }
    }

    private fun fetchFromNetwork(dbSource: LiveData<ResultType>) {
        val apiResponse = createCall()
        result.addSource(dbSource) { newData ->
            setValue(Resource.loading(newData))
        }
        result.addSource(apiResponse) { response ->
            result.removeSource(apiResponse)
            result.removeSource(dbSource)
            when (response) {
                is ApiSuccessResponse -> {
                    coroutineScope.launch(Dispatchers.IO) {
                        saveCallResult(processResponse(response))

                        withContext(Dispatchers.Main) {
                            result.addSource(loadFromDb()) { newData ->
                                setValue(Resource.success(newData))
                            }
                        }
                    }
                }

                is ApiEmptyResponse -> {
                    coroutineScope.launch(Dispatchers.Main) {
                        result.addSource(loadFromDb()) { newData ->
                            setValue(Resource.success(newData))
                        }
                    }
                }

                is ApiErrorResponse -> {
                    onFetchFailed()
                    result.addSource(dbSource) { newData ->
                        setValue(Resource.error(response.errorMessage, newData))
                    }
                }
            }
        }
    }
}

3
在我看来,代码库应该暴露出suspend函数或返回Channel/Flow对象,具体取决于API的性质。实际上,协程是在ViewModel中设置的。LiveData由ViewModel引入,而不是由代码库引入。 - CommonsWare
@CommonsWare 您建议重写NetworkBoundResource,以返回实际数据(或资源<T>),而不使用它和存储库中的LiveData吗? - Kamil Szustak
你是想使用NetworkBoundResource的人。我的评论更为一般:在我看来,Kotlin仓库实现应该暴露与协程相关的API。 - CommonsWare
我想感谢所有人对我的问题和不同答案所给予的帮助。同时感谢 @CommonsWare,他的提示帮助我再次改进了代码。 - Valerio
@CommonsWare,您建议不要使用Room数据库与LiveData吗? - maxbeaudoin
2
我更倾向于将其表述为个人偏好。LiveData缺乏RxJava或Kotlin协程的能力。 LiveData非常适用于与活动或片段的“最后一英里”通信,并考虑到了这一点而设计。对于小型应用程序,如果您想跳过存储库并使ViewModel直接与RoomDatabase通信,则LiveData是可以的。 - CommonsWare
3个回答

30

更新 (2020-05-27):

更符合 Kotlin 语言惯用方式的一种方法,使用 Flow API,并借鉴 Juan 的答案,可以表示为以下独立函数:

inline fun <ResultType, RequestType> networkBoundResource(
    crossinline query: () -> Flow<ResultType>,
    crossinline fetch: suspend () -> RequestType,
    crossinline saveFetchResult: suspend (RequestType) -> Unit,
    crossinline onFetchFailed: (Throwable) -> Unit = { Unit },
    crossinline shouldFetch: (ResultType) -> Boolean = { true }
) = flow<Resource<ResultType>> {
    emit(Resource.Loading(null))
    val data = query().first()

    val flow = if (shouldFetch(data)) {
        emit(Resource.Loading(data))

        try {
            saveFetchResult(fetch())
            query().map { Resource.Success(it) }
        } catch (throwable: Throwable) {
            onFetchFailed(throwable)
            query().map { Resource.Error(throwable, it) }
        }
    } else {
        query().map { Resource.Success(it) }
    }

    emitAll(flow)
}

以上代码可以从一个类(例如Repository)中调用,方法如下:
fun getItems(request: MyRequest): Flow<Resource<List<MyItem>>> {
    return networkBoundResource(
        query = { dao.queryAll() },
        fetch = { retrofitService.getItems(request) },
        saveFetchResult = { items -> dao.insert(items) }
    )
}

Translated answer:

我一直使用livedata-ktx工件来实现,不需要传入任何CoroutineScope。该类还仅使用一种类型,而不是两种类型(例如ResultType/RequestType),因为我总是在其他地方使用适配器来映射它们。

import androidx.lifecycle.LiveData
import androidx.lifecycle.liveData
import androidx.lifecycle.map
import nihk.core.Resource

// Adapted from: https://developer.android.com/topic/libraries/architecture/coroutines
abstract class NetworkBoundResource<T> {

    fun asLiveData() = liveData<Resource<T>> {
        emit(Resource.Loading(null))

        if (shouldFetch(query())) {
            val disposable = emitSource(queryObservable().map { Resource.Loading(it) })

            try {
                val fetchedData = fetch()
                // Stop the previous emission to avoid dispatching the saveCallResult as `Resource.Loading`.
                disposable.dispose()
                saveFetchResult(fetchedData)
                // Re-establish the emission as `Resource.Success`.
                emitSource(queryObservable().map { Resource.Success(it) })
            } catch (e: Exception) {
                onFetchFailed(e)
                emitSource(queryObservable().map { Resource.Error(e, it) })
            }
        } else {
            emitSource(queryObservable().map { Resource.Success(it) })
        }
    }

    abstract suspend fun query(): T
    abstract fun queryObservable(): LiveData<T>
    abstract suspend fun fetch(): T
    abstract suspend fun saveFetchResult(data: T)
    open fun onFetchFailed(exception: Exception) = Unit
    open fun shouldFetch(data: T) = true
}

就像 @CommonsWare 在评论中提到的那样,但是,直接暴露一个 Flow<T> 会更好。以下是我尝试用来实现这一点的代码。请注意,我尚未在生产中使用此代码,所以买家自负。

import kotlinx.coroutines.flow.*
import nihk.core.Resource

abstract class NetworkBoundResource<T> {

    fun asFlow(): Flow<Resource<T>> = flow {
        val flow = query()
            .onStart { emit(Resource.Loading<T>(null)) }
            .flatMapConcat { data ->
                if (shouldFetch(data)) {
                    emit(Resource.Loading(data))

                    try {
                        saveFetchResult(fetch())
                        query().map { Resource.Success(it) }
                    } catch (throwable: Throwable) {
                        onFetchFailed(throwable)
                        query().map { Resource.Error(throwable, it) }
                    }
                } else {
                    query().map { Resource.Success(it) }
                }
            }

        emitAll(flow)
    }

    abstract fun query(): Flow<T>
    abstract suspend fun fetch(): T
    abstract suspend fun saveFetchResult(data: T)
    open fun onFetchFailed(throwable: Throwable) = Unit
    open fun shouldFetch(data: T) = true
}

如果您在此处设置断点 if (shouldFetch(data)),您将看到它被调用了两次。第一次是当您从数据库获取结果时,第二次是当调用 saveFetchResult(fetch()) 时。 - Juan Cruz Soler
如果你仔细想一想,这就是你使用Flow时想要的结果。你正在将某些东西保存在数据库中,并希望Room通知你有关该更改并再次调用你的flatMapConcat代码。如果你不想要这种行为,那么你就不应该使用T而不是Flow<T> - Juan Cruz Soler
也许你没有从数据库返回 Flow<List<T>>,而你应该这样做。请查看 https://issuetracker.google.com/issues/130428884。 - Juan Cruz Soler
4
你是正确的,我误解了代码。flatMapConcat会返回一个新的Flow以供观察,因此最初的Flow将不再被调用。
两个答案的行为方式相同,所以我会保留自己的实现方式。
对于造成的困惑,我很抱歉,并感谢你的解释!
- Juan Cruz Soler
1
@FlorianWalther 关于你的第二个问题,我在这里给出了答案:https://stackoverflow.com/a/65984833/2997980 - N1hk
显示剩余14条评论

10

@N1hk 的回答是正确的,这只是一个不使用 flatMapConcat 操作符的不同实现(它目前被标记为 FlowPreview

@FlowPreview
@ExperimentalCoroutinesApi
abstract class NetworkBoundResource<ResultType, RequestType> {

    fun asFlow() = flow {
        emit(Resource.loading(null))

        val dbValue = loadFromDb().first()
        if (shouldFetch(dbValue)) {
            emit(Resource.loading(dbValue))
            when (val apiResponse = fetchFromNetwork()) {
                is ApiSuccessResponse -> {
                    saveNetworkResult(processResponse(apiResponse))
                    emitAll(loadFromDb().map { Resource.success(it) })
                }
                is ApiErrorResponse -> {
                    onFetchFailed()
                    emitAll(loadFromDb().map { Resource.error(apiResponse.errorMessage, it) })
                }
            }
        } else {
            emitAll(loadFromDb().map { Resource.success(it) })
        }
    }

    protected open fun onFetchFailed() {
        // Implement in sub-classes to handle errors
    }

    @WorkerThread
    protected open fun processResponse(response: ApiSuccessResponse<RequestType>) = response.body

    @WorkerThread
    protected abstract suspend fun saveNetworkResult(item: RequestType)

    @MainThread
    protected abstract fun shouldFetch(data: ResultType?): Boolean

    @MainThread
    protected abstract fun loadFromDb(): Flow<ResultType>

    @MainThread
    protected abstract suspend fun fetchFromNetwork(): ApiResponse<RequestType>
}


1
在ApiErrorResponse情况下,发出Resource.error不是更好吗? - Kamil Szustak
Retrofit的返回类型应该是什么? - Mahmood Ali
@MahmoodAli 暂停 fun someData(@Query/@Path): ApiResponse<List<Postitems>> ... 根据您的数据进行管理 - USMAN osman
2
这种方法在加载过程中不会发出数据库更新,有时这是必要的。 - Florian Walther

0

我是 Kotlin Coroutine 的新手。我这周刚遇到了这个问题。

我认为,如果你按照上面的帖子使用存储库模式,我的建议是可以自由地将 CoroutineScope 传递给 NetworkBoundResourceCoroutineScope 可以成为 Repository 函数的参数之一,该函数返回 LiveData,例如:

suspend fun getData(scope: CoroutineScope): LiveDate<T>

在调用ViewModel中的getData()时,将内置作用域viewmodelscope作为CoroutineScope传递进去,这样NetworkBoundResource就可以在viewmodelscope中工作并与ViewModel的生命周期绑定。 NetworkBoundResource中的协程将会在ViewModel销毁时被取消,这是一个好处。

要使用内置作用域viewmodelscope,请不要忘记在您的build.gradle中添加以下内容。

implementation 'androidx.lifecycle:lifecycle-viewmodel-ktx:2.2.0-alpha01'

网页内容由stack overflow 提供, 点击上面的
可以查看英文原文,
原文链接