考虑以下示例:
Observable.range(1, 10).subscribe(i -> {
System.out.println(i);
if (i == 5) {
throw new RuntimeException("oops!");
}
}, Throwable::printStackTrace);
这会输出从1到5的数字,然后打印异常。
我想要实现的是当抛出异常后让观察者保持订阅并继续运行,即打印从1到10的所有数字。
我尝试使用retry()
和其他各种错误处理运算符,但如文档所述,它们的目的是处理可观测对象本身发射的错误。
最简单的解决方案就是将onNext
的整个主体包装在try-catch块中,但我认为这不是一个好的解决方案。在类似的Rx.NET问题中,提出了一种解决方案,即创建一个代理可观测对象来进行包装。我尝试重新制作它:
Observable<Integer> origin = Observable.range(1, 10);
Observable<Integer> proxy = Observable.create((Observable.OnSubscribe<Integer>) s ->
origin.subscribe(i -> {try { s.onNext(i); } catch (Exception ignored) {}}, s::onError, s::onCompleted));
proxy.subscribe(i -> {
System.out.println(i);
if (i == 5) {
throw new RuntimeException("oops!");
}
}, Throwable::printStackTrace);
这并不会改变任何事情,因为RxJava本身将订阅者包装成SafeSubscriber
。使用unsafeSubscribe
来规避它似乎也不是一个好的解决方案。
我该怎么办才能解决这个问题?