RxJava2可观察对象take方法会抛出UndeliverableException异常。

43
据我理解,RxJava2中的values.take(1)会创建另一个Observable,其中只包含原始Observable中的一个元素。由于它是通过take(1)过滤掉的,因此绝不能抛出异常,因为它发生在第二个位置。
例如以下代码片段:
    Observable<Integer> values = Observable.create(o -> {
        o.onNext(1);
        o.onError(new Exception("Oops"));
    });

    values.take(1)
            .subscribe(
                    System.out::println,
                    e -> System.out.println("Error: " + e.getMessage()),
                    () -> System.out.println("Completed")
            );

输出

1
Completed
io.reactivex.exceptions.UndeliverableException: java.lang.Exception: Oops
    at io.reactivex.plugins.RxJavaPlugins.onError(RxJavaPlugins.java:366)
    at io.reactivex.internal.operators.observable.ObservableCreate$CreateEmitter.onError(ObservableCreate.java:83)
    at ch02.lambda$main$0(ch02.java:28)
    at io.reactivex.internal.operators.observable.ObservableCreate.subscribeActual(ObservableCreate.java:40)
    at io.reactivex.Observable.subscribe(Observable.java:10841)
    at io.reactivex.internal.operators.observable.ObservableTake.subscribeActual(ObservableTake.java:30)
    at io.reactivex.Observable.subscribe(Observable.java:10841)
    at io.reactivex.Observable.subscribe(Observable.java:10827)
    at io.reactivex.Observable.subscribe(Observable.java:10787)
    at ch02.main(ch02.java:32)
Caused by: java.lang.Exception: Oops
    ... 8 more
Exception in thread "main" io.reactivex.exceptions.UndeliverableException: java.lang.Exception: Oops
    at io.reactivex.plugins.RxJavaPlugins.onError(RxJavaPlugins.java:366)
    at io.reactivex.internal.operators.observable.ObservableCreate$CreateEmitter.onError(ObservableCreate.java:83)
    at ch02.lambda$main$0(ch02.java:28)
    at io.reactivex.internal.operators.observable.ObservableCreate.subscribeActual(ObservableCreate.java:40)
    at io.reactivex.Observable.subscribe(Observable.java:10841)
    at io.reactivex.internal.operators.observable.ObservableTake.subscribeActual(ObservableTake.java:30)
    at io.reactivex.Observable.subscribe(Observable.java:10841)
    at io.reactivex.Observable.subscribe(Observable.java:10827)
    at io.reactivex.Observable.subscribe(Observable.java:10787)
    at ch02.main(ch02.java:32)
Caused by: java.lang.Exception: Oops
    ... 8 more

我的问题:

  1. 我是否理解得正确?
  2. 引起异常的真正原因是什么?
  3. 消费者应该如何解决这个问题?

请参考2.1.1版本以后的FlowableEmitter.tryOnError和类似方法。 - tim4dev
4个回答

63
  1. 是的,但因为可观察的“ends”并不意味着运行在create(...)内部的代码已停止。在这种情况下完全确保安全,您需要使用o.isDisposed()来查看可观察对象是否在下游结束。
  2. 异常出现是因为RxJava 2的策略是绝不允许丢失onError调用。如果可观察对象已经终止,则它要么向下传递,要么作为全局UndeliverableException抛出。对于可观察对象已经结束并发生异常的情况,如何“正确”处理是由可观察对象的创建者决定的。
  3. 问题在于生产者(Observable)和消费者(Subscriber)在流何时结束上存在分歧。在这种情况下,由于生产者超过了消费者,因此只能在生产者中解决该问题。

6
这样处理是否正确?if (!o.isDisposed()) { o.onError(new Exception("Oops")); } - P.J.Meisch
2
如果在可观察对象不再被观察的情况下,可以忽略该异常,那么是可以接受的。如果该异常确实需要被处理,则应该无条件地调用它。 - Kiskae
2
不,这需要在生产者中修复,因为消费者已经声明自己已终止。 - Kiskae
2
如果(!o.isDisposed()){ o.onError(new Exception("Oops")); }不是正确的处理方式,因为存在竞态条件(在if条件和onError调用之间,o可以被处理掉,这不是理论上的问题,在生产系统中确实会发生)。请参见此讨论:https://github.com/ReactiveX/RxJava/issues/4880。 - Emanuel Moecklin
2
@AbdElraoufSabri 当tryOnError引入后,我抛弃了那段代码。虽然在某个旧的提交中可以找到它,但现在有了tryOnError还有什么意义呢? - Emanuel Moecklin
显示剩余9条评论

22

在上一条评论中,@Kiskae正确回答了为什么会出现这种异常的原因。

这里是关于这个主题的官方文档链接: RxJava2-wiki

有时您无法更改此行为,因此有一种方法可以处理这些UndeliverableException。以下是如何避免崩溃和错误行为的代码片段:

RxJavaPlugins.setErrorHandler(e -> {
    if (e instanceof UndeliverableException) {
        e = e.getCause();
    }
    if ((e instanceof IOException) || (e instanceof SocketException)) {
        // fine, irrelevant network problem or API that throws on cancellation
        return;
    }
    if (e instanceof InterruptedException) {
        // fine, some blocking code was interrupted by a dispose call
        return;
    }
    if ((e instanceof NullPointerException) || (e instanceof IllegalArgumentException)) {
        // that's likely a bug in the application
        Thread.currentThread().getUncaughtExceptionHandler()
            .handleException(Thread.currentThread(), e);
        return;
    }
    if (e instanceof IllegalStateException) {
        // that's a bug in RxJava or in a custom operator
        Thread.currentThread().getUncaughtExceptionHandler()
            .handleException(Thread.currentThread(), e);
        return;
    }
    Log.warning("Undeliverable exception received, not sure what to do", e);
});

这段代码来自上面链接。

重要提示。此方法将全局错误处理程序设置为RxJava,如果您想摆脱这些异常,那么最好的选择是取消此设置。


我无法摆脱这些异常,我需要从这个RX获取一些数据,但当出现此错误时,我无法从Json中获取我的数据..现在该怎么办?:( - Anice Jahanjoo
当我在Observable.create中进行某些网络请求时,我的subscriber在网络调用开始时并没有被处理,而是在我获取调用响应时已经被处理了。有没有一种方法可以在RxJavaPlugins.setErrorHandler中不出现InterruptedException - isabsent

6

Kotlin

我把这个写在MainActivity的onCreate方法里。

private fun initRxErrorHandler(){
    RxJavaPlugins.setErrorHandler { throwable ->
        if (throwable is UndeliverableException) {
            throwable.cause?.let {
                Thread.currentThread().uncaughtExceptionHandler?.uncaughtException(Thread.currentThread(), it)
                return@setErrorHandler
            }
        }
        if (throwable is IOException || throwable is SocketException) {
            // fine, irrelevant network problem or API that throws on cancellation
            return@setErrorHandler
        }
        if (throwable is InterruptedException) {
            // fine, some blocking code was interrupted by a dispose call
            return@setErrorHandler
        }
        if (throwable is NullPointerException || throwable is IllegalArgumentException) {
            // that's likely a bug in the application
            Thread.currentThread().uncaughtExceptionHandler?.uncaughtException(Thread.currentThread(), throwable)
            return@setErrorHandler
        }
        if (throwable is IllegalStateException) {
            // that's a bug in RxJava or in a custom operator
            Thread.currentThread().uncaughtExceptionHandler?.uncaughtException(Thread.currentThread(), throwable)
            return@setErrorHandler
        }
        Log.w("Undeliverable exception", throwable)
    }
}

0

在使用observable.create()时,只需使用tryOnError()。onError()不能保证错误得到处理。这里有各种错误处理运算符 在此处


网页内容由stack overflow 提供, 点击上面的
可以查看英文原文,
原文链接