RxJava - 何时使用带有create方法的Observable

4
我正在阅读一篇教程:http://code.tutsplus.com/tutorials/getting-started-with-reactivex-on-android--cms-24387,它涉及RxAndroid,但与RxJava几乎相同。我不确定是否完全理解了该概念。
下面我写了一个方法和一个示例用法。
我的问题是:这是实现函数的正确方式吗,以便我可以在其他线程上异步运行它们? 实际上,它们只会返回创建的Observable,运行真正的代码并处理错误等所有内容。
或者说这是错误的,那么我想知道正确的方法。
Observable<String> googleSomething(String text){
    return Observable.create(new Observable(){
        @Override
        public void call(Subscriber<? super String> subscriber) {
             try {
                String data = fetchData(text); // some normal method
                subscriber.onNext(data); // Emit the contents of the URL
                subscriber.onCompleted(); // Nothing more to emit
            } catch(Exception e) {
                subscriber.onError(e); // In case there are network errors
            }
        }
    });
}

googleSomething("hello world").subscribeOn(Schedulers.io()).observeOn(Schedulers.immediate()).subscribe(...)

Schedulers.immediate()是否用于在当前线程上执行订阅代码?它在javadoc中说:“创建并返回一个调度程序,立即在当前线程上执行工作。”,但我不确定。

4个回答

9
除非您更有经验且需要自定义运算符或想要连接遗留的addListener/removeListener基于API,否则不应该从create开始。有几个StackOverflow上的问题使用了create并成为麻烦的源头。
我更喜欢fromCallable,它让您生成单个值或抛出异常,因此无需那些冗长的defer+just源。 Schedulers.immediate()立即在调用者线程上执行其任务,这是您示例中的io()线程,而不是主线程。目前,没有支持将计算移回到Java主线程,因为它需要阻塞跳板,通常也是一个坏主意。

如果我没有指定observeOn线程,那么默认行为是什么?那么我应该为订阅者使用哪个线程呢? - Greyshack
取决于您对序列结果的处理方式。 - akarnokd

2

作为初学者,你几乎不应该使用create()。有更简单的方法创建可观察对象,而且create()很难正确实现。

大多数情况下,你可以通过使用defer()轻松避开create()。例如,在这种情况下,你可以这样做:

Observable<String> googleSomething(String text) {
  return Observable.defer(new Func0<Observable<String>>() {
    @Override
    public Observable<String> call() {
      try {
        return Observable.just(fetchData(text));
      } catch (IOException e) {
        return Observable.error(e);
      }
    }
  });
}

如果您没有使用受检异常,那么甚至可以摆脱try-catch。RxJava会自动将任何 RuntimeException 转发到订阅者的 onError() 部分。请保留 html 标签。

Schedulers.immediate()怎么样?使用这个调度器和完全不调用.observeOn有什么区别吗? - Greyshack
immediate() 在这种情况下不起作用 - 它在当前线程上执行,该线程是由 io() 设置的。省略它也不会有任何区别。您真正需要检查的是您想要订阅的确切线程,并为其设置调度程序。 - Dan Lew

1

你的代码看起来很好。如果你不确定它是否在另一个线程上运行,可以在调用.subscribe()之后立即打印一些内容并查看输出的顺序。

googleSomething("hello world").subscribeOn(Schedulers.io()).observeOn(Schedulers.immediate()).subscribe(...) 
System.out.println("This should be printed first");

尝试在fetchData()中模拟长时间运行的操作,并立即打印其他内容。由于.subscribe()是非阻塞的,因此"这应该首先被打印"实际上将首先被打印。或者,您可以使用以下方法打印当前线程。
Thread.currentThread().getName()

在你的可观察对象内外使用它,输出应该有所不同。


1
你可以通过使用Observable.create(new OnSubscribe {})方法来创建Observable,但是:
  • 看一下defer()操作符,它允许你返回例如Observable.just()和Observable.error(),这样你就不需要直接操作subscriber了
  • 最好使用SyncOnSubscribe/AsyncOnSubscribe来处理背压

Schedulers.immediate()会将Observable的处理保持在当前线程上,因此在你的情况下,它将是Schedulers.io线程之一。


网页内容由stack overflow 提供, 点击上面的
可以查看英文原文,
原文链接