从数据库和网络加载数据(Room + Retrofit + RxJava2)

4
我有一个示例API请求,返回用户观看列表的列表。当用户加载观看列表屏幕时,我想要实现以下流程:
  1. 立即从DB缓存中加载数据(cacheWatchList
  2. 在后台启动RetroFit网络调用。

    i. onSuccess返回apiWatchList
    ii. onError返回cacheWatchList

  3. cacheWatchListapiWatchList进行比较

    i. 相同 -> 一切正常,因为数据已经显示给用户,无需操作。

    ii. 不同 -> 将apiWatchList保存到本地存储并将其发送到下游。

到目前为止我做了什么? Watchlist.kt
data class Watchlist(
  val items: List<Repository> = emptyList()
)

LocalStore.kt (Android room)

  fun saveUserWatchlist(repositories: List<Repository>): Completable {    
    return Completable.fromCallable {      
      watchlistDao.saveAllUserWatchlist(*repositories.toTypedArray())
    }
  }

RemoteStore.kt (Retrofit api调用)


  fun getWatchlist(userId: UUID): Single<Watchlist?> {
    return api.getWatchlist(userId)
  }

DataManager.kt

  fun getWatchlist(userId: UUID): Flowable<List<Repository>?> {
    val localSource: Single<List<Repository>?> =
      localStore.getUserWatchlist()
        .subscribeOn(scheduler.computation)

    val remoteSource: Single<List<Repository>> = remoteStore.getWatchlist(userId)
      .map(Watchlist::items)
      .doOnSuccess { items: List<Repository> ->
        localStore.saveUserWatchlist(items)
          .subscribeOn(scheduler.io)
          .subscribe()
      }
      .onErrorResumeNext { throwable ->
        if (throwable is IOException) {
          return@onErrorResumeNext localStore.getUserWatchlist()
        }
        return@onErrorResumeNext Single.error(throwable)
      }
      .subscribeOn(scheduler.io)

    return Single.concat(localSource, remoteSource)
  }

上述流程的问题在于,它对于每个数据源向下游(presenter)调用onNext方法两次,即使这两个数据是相同的。我可以在presenter中进行数据差异逻辑并相应更新,但我希望DataManager类为我处理逻辑(Clean Architecture、SOC)。
我的问题是?
1. 实现上述逻辑的最佳方式是什么? 2. 我是否在DataManager(参见:doOnSuccess代码)中泄漏了内部订阅?当presenter被销毁时,我会处理外部订阅。

尝试过谷歌的paging库吗? - pskink
通过将默认适配器扩展为“PagedListAdapter”,我们可以解决这个问题。但是,“PagedListAdapter”会导致呈现层的结果发生变化,我希望数据层处理数据更改。 - blizzard
你的意思是什么?你不能使你的DataSource无效吗? - pskink
你的演示者只需要创建 LivePagedListProvider,而所有逻辑都将在你的自定义 DataSource 中实现。 - pskink
@pskink,我还没有尝试过分页库,只是读了一下相关内容。我将使用分页库来实现相同的功能,并在这里更新。我期望一个响应式解决方案,它结合了多个/链式的RxJava2操作符来解决问题,让我们看看。 - blizzard
显示剩余2条评论
1个回答

8
fun getWatchlist(userId: UUID): Observable<List<Repository>?> {
val remoteSource: Single<List<Repository>> = 
remoteStore.getWatchlist(userId)
        .map(Watchlist::items)
        .subscribeOn(scheduler.io)

return localStore.getUserWatchlist()
        .flatMapObservable { listFromLocal: List<Repository> ->
            remoteSource
                    .observeOn(scheduler.computation)
                    .toObservable()
                    .filter { apiWatchList: List<Repository> ->
                        apiWatchList != listFromLocal
                    }
                    .flatMapSingle { apiWatchList ->
                        localSource.saveUserWatchlist(apiWatchList)
                                .andThen(Single.just(apiWatchList))
                    }
                    .startWith(listFromLocal)
        }
}

逐步解释:

  1. 从localStore中加载数据。
  2. 使用flatMapObservable订阅remoteSource,每当localStore发出数据时都会执行一次。
  3. 由于内部observable有多个发射(来自本地的初始数据和来自remoteSource的更新数据),因此将Single转换为Observable。
  4. 将remoteSource的数据与localStore的数据进行比较,并仅在newData != localData的情况下处理数据。
  5. 对于每个过滤后的发射,初始化localSource以保存数据,并在操作完成后将保存的数据作为Single处理。
  6. 按要求,在远程请求数据之前应处理localStore中的数据,只需在操作符链的末尾添加startWith即可。

谢谢,问题已解决。请为您代码返回的部分添加一些文本说明,有益于整个社区。然后我会把它标记为答案。 - blizzard

网页内容由stack overflow 提供, 点击上面的
可以查看英文原文,
原文链接