RxAndroid视图可观察对象 NetworkOnMainThreadException

3

我有一个Button,我从中创建了一个Observable<OnClickEvent>

当点击这个按钮时,我希望从网络上获取一个文件,但我遇到了与网络和线程相关的问题。

这个例子会抛出android.os.NetworkOnMainThreadException

Observable<OnClickEvent> networkButtonObservable = ViewObservable.clicks(testNetworkButton);
networkButtonObservable
    .map(new Func1<OnClickEvent, List<String>>() {
             @Override
             public List<String> call(OnClickEvent onClickEvent) {
                 return TestAPI.getTestService().fetchTestResponse();
             }
         }
    )
    .observeOn(AndroidSchedulers.mainThread())
    .subscribe(new Action1<Object>() {
                   @Override
                   public void call(Object o) {Log.w("Final result: " + o);
                   }
               }
    );

所以我尝试从另一个线程开始。

以下代码会抛出异常 rx.exceptions.OnErrorNotImplementedException: Observers must subscribe from the main UI thread, but was Thread[RxNewThreadScheduler-1,5,main] :

networkButtonObservable
    .subscribeOn(Schedulers.newThread())
    .map(new Func1<OnClickEvent, List<String>>() {
             @Override
             public List<String> call(OnClickEvent onClickEvent) {
                 return TestAPI.getTestService().fetchTestResponse();
             }
         }
    )
    .observeOn(AndroidSchedulers.mainThread())
    .subscribe(new Action1<Object>() {
                   @Override
                   public void call(Object o) {Log.w("Final result: " + o);
                   }
               }
    );

好的,现在我尝试在开头使用 .debounce()

networkButtonObservable
    .debounce(10, TimeUnit.MILLISECONDS)
    .map(new Func1<OnClickEvent, List<String>>() {
             @Override
             public List<String> call(OnClickEvent onClickEvent) {
                 return TestAPI.getTestService().fetchTestResponse();
             }
         }
    )
    .observeOn(AndroidSchedulers.mainThread())
    .subscribe(new Action1<Object>() {
                   @Override
                   public void call(Object o) {Log.w("Final result: " + o);
                   }
               }
    );

这个成功了。

显然,我不喜欢在我的代码中添加延迟,所以我正在尝试弄清楚线程方面的情况。为什么第一个例子没有在后台线程中执行.map()内部的代码呢?

或者说我漏掉了什么?

--- 更新

我将我的TestAPI更改为返回Observable,并将对networkButtonObservable的第一个调用更改为.flatMap()。这也可以正常运行。但我仍然不知道使用.map()的原始方式为什么会失败。

networkButtonObservable
    .flatMap(new Func1<OnClickEvent, Observable<?>>() {
        @Override
        public Observable<?> call(OnClickEvent onClickEvent) {
            return TestAPI.getTestService().fetchTestResponseObservable();
        }
    })
    .observeOn(AndroidSchedulers.mainThread())
    .subscribe(new Action1<Object>() {
                   @Override
                   public void call(Object o) {Log.w("Final result: " + o);
                   }
               }
    );

就像错误提示所说的那样,在UI线程中不能运行网络操作。 - Carnal
哪一行代码抛出了这个异常:“观察者必须从主UI线程中订阅,但是...”?我认为你在另一个线程中订阅了Observable而不是UI线程。(但我不确定) - dwursteisen
.subscribeOn(Schedulers.io()) 必须放在你的 map 后面,因为 map 中的函数在订阅时被调用。 - njzk2
1个回答

10

我不是Android专家,但根据错误消息,我认为您需要在主线程和后台线程之间传递值。通常,Android示例会向您展示如何将subscribeOn/observeOn一对添加到流处理中:

Observable.just(1)
.map(v -> doBackgroundWork())
.subscribeOn(Schedulers.io())
.observeOn(AndroidSchedulers.mainThread())
.subscribe(v -> {});

但在这种情况下,“源”通常是你可以控制的冷观察者。

在你的问题中,源是一个具有特定要求的热Observable,你需要在主线程上订阅它,然后需要在后台线程上进行网络调用,最后在主线程上显示结果。

在这种情况下,你可以多次使用observeOn

networkButtonObservable
.subscribeOn(AndroidSchedulers.mainThread()) // just in case
.observeOn(Schedulers.io())
.map(v -> TestAPI.getTestService().fetchTestResponse())
.observeOn(AndroidSchedulers.mainThread())
.subscribe(v -> updateGUI(v));

我认为fetchTestResponseObservable已经应用了自己的subscribeOnobserveOn,因此它不会抛出网络异常。

另外,我想提一下,使用多个subscribeOn在功能上等同于只使用距离发射源最近的一个,但从技术上讲,这将占用未使用的线程资源。然而,在流中使用多个observeOn是有意义的,因为你可以使用它们在线程之间“管道化”地处理流。


谢谢您的解释。我现在对 Rx 的内部工作原理有了更好的理解。 - xorgate

网页内容由stack overflow 提供, 点击上面的
可以查看英文原文,
原文链接