RxJava:链式观察者

43
能否使用RxJava实现类似于下一个链接的功能?
loginObservable()
   .then( (someData) -> {
      // returns another Observable<T> with some long operation
      return fetchUserDataObservable(someData);

   }).then( (userData) -> {
      // it should be called when fetching user data completed (with userData of type T)
      cacheUserData(userData);

   }).then( (userData) -> {
      // it should be called after all previous operations completed
      displayUserData()

   }).doOnError( (error) -> {
      //do something
   })

我发现这个库非常有趣,但是不知道如何链接请求,其中每个请求都依赖于前一个请求的结果。
3个回答

45
当然,RxJava 支持 .map 实现这个功能。来自 RxJava Wiki 的说明如下:

map

基本上就是这样:

loginObservable()
   .switchMap( someData -> fetchUserDataObservable(someData) )
   .map( userData -> cacheUserData(userData) )
   .subscribe(new Subscriber<YourResult>() {
        @Override
        public void onCompleted() {
           // observable stream has ended - no more logins possible
        }
        @Override
        public void onError(Throwable e) {
            // do something
        }
        @Override
        public void onNext(YourType yourType) {
            displayUserData();
        }
    });

1
据我理解,map函数将一个值转换为另一个值(例如,int -> String)。因此,即使我从第一个map返回Observable<T>,第二个map也会立即使用类型为Observable<T>的userData进行调用,但是当fetchUserData完成时,我希望它使用类型T进行调用。 - Mikhail
5
我认为switchMap可能是你想要的。 - Dylan
1
@Mikhail 抱歉,我的错 - 我离开了并没有看到评论。如果你正在消费一个可观察对象,那么 switchMap 确实是你需要的。 - Benjamin Gruenbaum
9
另外,“flatMap”提供了典型的链接行为,类似于被链接为then的Promise。 - André Staltz
我可以使用switchMap来链接多个调用吗? - ericn
显示剩余2条评论

15

当你在谷歌上搜索 RxJava链式可观察对象 时,这篇文章是排名第一的,我想再补充一个常见情况,即您不想 转换 收到的数据,而是将其与另一个操作链接起来(例如将数据设置为数据库)。使用 .flatmap()。以下是示例:

mDataManager
    .fetchQuotesFromApi(limit)
    .subscribeOn(mSchedulerProvider.io())
    .observeOn(mSchedulerProvider.ui())
    // OnErrorResumeNext and Observable.error() would propagate the error to
    // the next level. So, whatever error occurs here, would get passed to
    // onError() on the UI side.
    .onErrorResumeNext(Function { Observable.error<List<Quote>>(it) })
    .flatMap { t: List<Quote> ->
        // Chain observable as such
        mDataManager.setQuotesToDb(t).subscribe(
            {},
            { e { "setQuotesToDb() error occurred: ${it.localizedMessage}" } },
            { d { "Done server set" } }
        )
        Observable.just(t)
    }
    .subscribeBy(
        onNext = {},
        onError = { mvpView?.showError("No internet connection") },
        onComplete = { d { "onComplete(): done with fetching quotes from api" } }
    )

这是RxKotlin2,但是与RxJava和RxJava2的思想相同:
快速解释: - 我们尝试使用 `mDataManager.fetchQuotesFromApi()` 从 API 获取一些数据(此示例中为报价)。 - 我们订阅可观察对象以在 `.io()` 线程上执行操作并在 `.ui()` 线程上显示结果。 - `onErrorResumeNext()` 确保我们从获取数据遇到的任何错误都被捕获在该方法中。当有错误时,我想终止整个链,所以我返回一个 `Observable.error()` - `flatmap()` 是链接部分。我想能够将从 API 得到的任何数据设置为我的数据库。我没有使用 `map()` 转换接收到的数据,而是对该数据进行了其他处理而不是转换它。 - 我订阅最后一组 observable。如果出现获取数据的错误(第一个 observable),则会使用 `onErrorResumeNext()` 处理(在此情况下传播到订阅的 `onError()`) - 我非常注意我正在订阅 DB 可观察对象(在 `flatmap()` 中)。由于它在 `subscribe()` 方法中处理,因此通过此可观察对象发生的任何错误不会传播到最后的 `subscribeBy()` 方法中。
代码来自此项目,位于https://github.com/Obaied/Sohan/blob/master/app/src/main/java/com/obaied/dingerquotes/ui/start/StartPresenter.kt

你可以使用map()而不是flatMap()并返回Observable.just(t)。请注意,这两个版本都不会等待mDataManager.setQuotesToDb()的结果完成就返回了。 - yN.

0

尝试使用scan()

Flowable.fromArray(array).scan(...).subscribe(...)

网页内容由stack overflow 提供, 点击上面的
可以查看英文原文,
原文链接