如何处理 RxJava 中 observer.onNext() 抛出的异常?

28

考虑以下示例:

Observable.range(1, 10).subscribe(i -> {
    System.out.println(i);

    if (i == 5) {
        throw new RuntimeException("oops!");
    }
}, Throwable::printStackTrace);

这会输出从1到5的数字,然后打印异常。

我想要实现的是当抛出异常后让观察者保持订阅并继续运行,即打印从1到10的所有数字。

我尝试使用retry()其他各种错误处理运算符,但如文档所述,它们的目的是处理可观测对象本身发射的错误。

最简单的解决方案就是将onNext的整个主体包装在try-catch块中,但我认为这不是一个好的解决方案。在类似的Rx.NET问题中,提出了一种解决方案,即创建一个代理可观测对象来进行包装。我尝试重新制作它:

Observable<Integer> origin = Observable.range(1, 10);
Observable<Integer> proxy = Observable.create((Observable.OnSubscribe<Integer>) s ->
        origin.subscribe(i -> {try { s.onNext(i); } catch (Exception ignored) {}}, s::onError, s::onCompleted));

proxy.subscribe(i -> {
    System.out.println(i);

    if (i == 5) {
        throw new RuntimeException("oops!");
    }
}, Throwable::printStackTrace);

这并不会改变任何事情,因为RxJava本身将订阅者包装成SafeSubscriber。使用unsafeSubscribe来规避它似乎也不是一个好的解决方案。

我该怎么办才能解决这个问题?

1个回答

19
这是在学习Rx时经常出现的问题。
TL;DR
将异常处理逻辑放在订阅者中比创建通用的可观察包装器更好。
Explanation
请记住,Rx是关于向订阅者推送事件的。
从可观察接口来看,一个可观察对象除了知道它们处理事件所花费的时间或抛出的异常信息之外,实际上并不知道订阅者的任何信息。
创建通用的包装器来处理订阅者异常并继续向该订阅者发送事件是一个坏主意。
为什么?因为可观察对象只应该知道订阅者现在处于未知的失败状态。在这种情况下继续发送事件是不明智的——也许,例如,订阅者处于每个从此时起的事件都会抛出异常并且需要一段时间才能完成的条件下。
一旦订阅者抛出异常,对于可观察对象只有两种可行的行动方案:
重新抛出异常 实现通用处理以记录失败并停止向其发送任何事件(任何类型),清理由于该订阅者而产生的任何资源,并继续进行任何剩余的订阅。

处理订阅者异常的具体方法是一个不好的设计选择;它会在订阅者和可观察对象之间创建不适当的行为耦合。因此,如果您想对不良订阅者具有弹性,上述两种选择确实是可观察对象本身责任的明智限度。

如果您希望您的订阅者具有弹性并继续运行,则应该绝对将其包装在异常处理逻辑中,该逻辑专门设计用于处理您知道如何恢复的特定异常(以及处理瞬态异常、日志记录、重试逻辑、断路器等)。

只有订阅者本身才能理解在面临失败时是否适合接收进一步的事件。

如果您的情况需要开发可重用的错误处理逻辑,请将自己置于包装观察者的事件处理程序的心态下,而不是可观察对象,并确保不要在面临失败时盲目继续传输事件。 Release It!虽然没有关于Rx的写作,但它是一部有趣的软件工程经典著作,对这个问题有很多话要说。如果您还没有阅读过它,我强烈建议您阅读。


网页内容由stack overflow 提供, 点击上面的
可以查看英文原文,
原文链接