RxJava在Android中的异步任务

14
我正在尝试在Android中使用RxJava实现异步任务。我尝试了以下代码,但它没有起作用。它在UI线程上执行。我正在使用RxAndroid 0.24.0版本。
try {
    Observable.just(someMethodWhichThrowsException())
            .subscribeOn(Schedulers.newThread())
            .observeOn(AndroidSchedulers.mainThread())
            .subscribe(s -> onMergeComplete());
}
catch (IOException e) {
    e.printStackTrace();
}

不过,对我来说,下面的代码可以异步工作。

Observable observable = Observable.create(new Observable.OnSubscribe<String>() {
        @Override
        public void call(Subscriber<? super String> subscriber) {
            try {
                someMethodWhichThrowsException();
            } catch (IOException e) {
                e.printStackTrace();
            }

            subscriber.onCompleted();
        }
    });
    observable.subscribeOn(Schedulers.newThread()).observeOn(AndroidSchedulers.mainThread()).subscribe();

我正在尝试理解以下内容:

  1. 它们之间有什么区别?
  2. 在创建异步任务时,最佳做法是什么?
2个回答

21
  1. 它们之间有什么区别?
Observable.just(someMethodWhichThrowsException())
    .subscribeOn(Schedulers.newThread())

这相当于以下内容:

Object someResult = someMethodWhichThrowsException();
Observable.just(someResult)
    .subscribeOn(Schedulers.newThread())

正如你所看到的,这将使同步方法调用先执行,然后将其传递给 Observable.just,以成为可观察对象。

Observable.create(new Observable.OnSubscribe<String>() {
        @Override
        public void call(Subscriber<? super String> subscriber) {
            ...
        }
    })
    .subscribeOn(Schedulers.newThread())
    .observeOn(AndroidSchedulers.mainThread())
    .subscribe();

然而,这种方法将在订阅时在call块上运行代码。您已告诉它您想在一个新线程上进行订阅(subscribeOn(Schedulers.newThread())),因此订阅发生在一个新线程上,并且在订阅时运行的代码(call块)也在该线程上运行。这类似于调用Observable.defer的行为。

  1. 创建异步任务时有哪些最佳实践?

嗯,这取决于您和所需的行为。有时您希望异步代码立即开始运行(在这种情况下,您可能需要使用其中一个操作符来缓存它)。我绝对会考虑使用Async Utils库来实现这一点。

其他时候,您只希望它在订阅时运行(这是这里的示例中的行为)——例如,如果有副作用,或者如果您不关心何时运行它,只想使用内置功能使某些东西离开UI线程。Dan Lew提到Observable.defer非常适合将旧代码转换为Rx并将其移出UI线程。


谢谢...这真的很有帮助。 - muneikh
没问题!其他答案已经给出了解决方案,但并没有提供太多的解释。我建议使用内置的 defer 而不是添加额外的库,特别是在 Android 上 - 保持 APK 大小不变 :) - Adam S
谢谢,我没有意识到。我认为这应该是推荐的方法。 :) - muneikh
Async Utils库似乎非常陈旧,已经近一年没有更新了。 - IgorGanapolsky
这是一个小型库,如果它稳定无需更新,尽管我理解这种情绪。他们目前正在接受拉取请求,所以它绝对不会死!无论如何,库所做的一切都可以自己构建,但在开始使用Rx时,它特别有用。 - Adam S

11

使用来自RxJava Async Utils库的Async.start()。这将在另一个线程上调用您提供的函数。

示例:

Observable<String> observable = Async.start(new Func0<String>() {
    @Override
    public String call() {
        try {
            return someMethodWhichThrowsException();
        } catch (IOException e) {
            throw new RuntimeException(e);
        }
    }
});

正如您所指出的,必须将已检查异常包装到RuntimeException中。

另请参见https://github.com/ReactiveX/RxJava/wiki/Async-Operators#start


谢谢,这真的很有帮助。如果您对我原来的代码有什么建议,那就太好了。 - muneikh
1
我可以回答。Observable.just()允许从现有结果创建一个可观察对象。如果你写了Observable.just(myFunction()),myFunction()会被执行,然后结果会传递给just方法(就像任何Java表达式一样)。所以这并不是异步的。另一方面,当你调用Async.start()时,你的计算方法还没有被调用。它将在另一个线程中稍后被调用,当Rx调用func.call()时(其中func是提供给Async.start()的参数)。 - clemp6r
3
关于 Async.start,它会立即开始工作,甚至在订阅之前也是如此,有时这正是你想要的。如果你想等到订阅后再开始工作,可以使用 Async.toAction(...).call()Observable.defer。在这种情况下,defer 可以更好地处理异常,因为你可以通过 return Observable.error(new MyException()); 来抛出异常,而不是将其直接抛出,尽管抛出异常也会产生相同的结果。 - lopar
@lopar,RxJavaAsyncUtil提供了覆盖开始签名,该签名还包括一个“调度程序”,用于“运行函数”。我可以相信这个细节吗? - Eido95

网页内容由stack overflow 提供, 点击上面的
可以查看英文原文,
原文链接