RxJava中的Observable.create与Observable.just(1).flatMap的区别

3

我一直在学习RxJava,经常遇到需要将一些现有代码的结果转换成Observable的情况。

例如,我们来看以下示例:

ListenableFuture<T> result = request.executeAsync();
return result;

所以将其转换为可观察对象的最简单方法是执行以下操作:

ListenableFuture<T> result = request.executeAsync();
return Observable.from(result);

问题在于executeAsync在调用时实际上执行了请求。我想要的是将该调用延迟到被订阅的可观察对象。

我考虑了两种方法来实现这一点。

return Observable.create { aSubscriber ->
    if (!aSubscriber.unsubscribed) {
        aSubscriber.onNext(request.executeAsync())
    }
    if (!aSubscriber.unsubscribed) {
        aSubscriber.onCompleted()
    }
}

and

return Observable
    .just(1)
    .flatMap((_) -> Observable.from(request.executeAsync()));

在我看来,使用flatMap选项更简单,因为我不需要关心订阅者逻辑。

在使用create的时候有什么陷阱吗?有没有更好的Rx方式来简化集成?

谢谢


1
事实上,create() 比 flatMap() 更容易出现问题。 - akarnokd
2个回答

2
你可以使用 defer 替代:
Observable.defer(() -> request.executeAsync())
          .concatMap(Observable::from)
          .subscribe();

糟糕。回答得太快了:延迟(defer)不能解决你的问题。 - dwursteisen
我必须问一下:这个方法为什么不能解决我的问题?在我看来,它似乎会推迟调用直到可观察对象被订阅。或者是我误解了文档? - Crystark
因为它将发出ListenableFuture而不是ListenableFuture的值。所以我编辑了我的答案以发出该值。 - dwursteisen
2
这看起来可能可行,但我会选择 Observable.defer(() -> Observable.from(request.executeAsync()))。我有什么遗漏的吗?或者这会像我期望的那样运作吗? - Crystark

0

RxJavaGuava 是一个小型库,可以将 ListenableFuture 转换为 Observable,因此最好使用它而不是“自制”解决方案。 它允许您编写

ListenableFutureObservable.from(request.executeAsync(), Schedulers.computation())

根据请求的功能,您需要选择正确的调度程序。

我的帖子中实际上没有什么自己制作的东西。这全部都是基本内置的RxJava。我不确定你提出的建议是否会改变与RxJava的Observable.from(request.executeAsync())相比的任何内容,因为这将在调用时立即执行请求。 - Crystark
@Crystark 你说得对,我提出的方案也必须包装在 Observable.defer 中。但是,你建议的 Observable.defer(() -> Observable.from(request.executeAsync())) 更简单,所以你可以直接使用它(除非你想在 RxJava 线程池上运行请求,那么 RxJavaGuava 很有帮助)。 - Samuel Gruetter
好的,谢谢。顺便说一下,我认为你可以使用内置的RxJava线程池运行请求,使用Observable.from(Future, Scheduler) - Crystark

网页内容由stack overflow 提供, 点击上面的
可以查看英文原文,
原文链接