如何忽略错误并继续无限流?

61

我想知道如何忽略异常并继续无限流(在我的情况下,是位置流)?

我正在获取当前用户位置(使用Android-ReactiveLocation),然后将其发送到我的API(使用Retrofit)。

在我的情况下,当网络调用期间发生异常(例如超时),会调用onError方法并停止流。该如何避免这种情况发生?

活动:

private RestService mRestService;
private Subscription mSubscription;
private LocationRequest mLocationRequest = LocationRequest.create()
            .setPriority(LocationRequest.PRIORITY_HIGH_ACCURACY)
            .setInterval(100);
...
private void start() {
    mRestService = ...;
    ReactiveLocationProvider reactiveLocationProvider = new ReactiveLocationProvider(this);
    mSubscription = reactiveLocationProvider.getUpdatedLocation(mLocationRequest)
            .buffer(50)
            .flatMap(locations -> mRestService.postLocations(locations)) // can throw exception
            .subscribeOn(Schedulers.newThread())
            .observeOn(AndroidSchedulers.mainThread())
            .subscribe();
}

RestService:

public interface RestService {
    @POST("/.../")
    Observable<Response> postLocations(@Body List<Location> locations);
}

请参考以下类似的答案,其中包括点击事件异常处理:http://stackoverflow.com/questions/26154236/how-to-subscribe-to-click-events-so-exceptions-dont-unsubscribe/26229584#26229584 - Kirill Boyarshinov
这个编程问题的设置有点过于复杂了。你本可以用几行代码来简化它,无需考虑特定项目语义。 - Martin Andersson
11个回答

78
您可能希望使用其中一种错误处理运算符
  • onErrorResumeNext() - 如果Observable遇到错误,指示它发出一个项目序列。
  • onErrorReturn() - 如果Observable遇到错误,指示它发出特定的项目。
  • onExceptionResumeNext() - 如果Observable遇到异常(但不是其他类型的throwable),则指示它继续发出项目。
  • retry() - 如果源Observable发出错误,请重新订阅它,以希望它能够完成而没有错误。
  • retryWhen() - 如果源Observable发出错误,请将该错误传递给另一个Observable,以确定是否要重新订阅源。

特别是retryonExceptionResumeNext在您的情况下看起来很有前途。


4
当我在flatMap(locations -> mRestService.postLocations(locations))后加入 onExceptionResumeNext(Observable.empty())时,将会调用onCompleted并结束数据流。请注意不要改变原来的意思。 - Ziem
27
不要在flatMap之后添加它,而是将其放在flatMap内部。 - Samuel Gruetter
onErrorResumeNext( ) - 对于回退非常有用的语句。谢谢! - Taras Lozovyi

28

mRestService.postLocations(locations)会发出一个项目,然后完成。如果发生错误,则会发出错误并完成流。

当您在flatMap中调用此方法时,错误会继续传递到“主”流中,然后您的流就会停止。

您可以将错误转换为另一个项目(如此处所述:https://dev59.com/al4b5IYBdhLWcg3wiiV4#28971140),但不是在主要流中进行转换(我假设您已经尝试过了),而是在mRestService.postLocations(locations)上进行转换。

这样,此调用将发出一个错误,该错误将被转换为项目/另一个可观察对象,然后完成(无需调用onError)。

从消费者的角度来看,mRestService.postLocations(locations)将发出一个项目,然后完成,就像一切都成功一样。

mSubscription = reactiveLocationProvider.getUpdatedLocation(mLocationRequest)
        .buffer(50)
        .flatMap(locations -> mRestService.postLocations(locations).onErrorReturn((e) -> Collections.emptyList()) // can't throw exception
        .subscribeOn(Schedulers.newThread())
        .observeOn(AndroidSchedulers.mainThread())
        .subscribe();

5
很遗憾,它不能被忽略,而是会发出空列表。最好的情况是在出错时不要发出任何内容。 - Andrew Matiuk
1
“// can throw exception” 是什么意思?这是否意味着即使在 onErrorReturn 中,我们也可能再次遇到异常?! - Dr.jacky
1
我认为这是一个错误,应该改为“无法抛出异常”(所以我只编辑了注释)。发现得好! - dwursteisen

20

如果你只想忽略 flatMap 中的错误而不返回任何元素,那么可以这样做:

flatMap(item -> 
    restService.getSomething(item).onErrorResumeNext(Observable.empty())
);

2
这实际上会完成流吗? - Mario Lenci

7

以下是 @MikeN 回答中提供的链接信息(以防丢失):

import rx.Observable.Operator;
import rx.functions.Action1;

public final class OperatorSuppressError<T> implements Operator<T, T> {
    final Action1<Throwable> onError;

    public OperatorSuppressError(Action1<Throwable> onError) {
        this.onError = onError;
    }

    @Override
    public Subscriber<? super T> call(final Subscriber<? super T> t1) {
        return new Subscriber<T>(t1) {

            @Override
            public void onNext(T t) {
                t1.onNext(t);
            }

            @Override
            public void onError(Throwable e) {
                onError.call(e);
            }

            @Override
            public void onCompleted() {
                t1.onCompleted();
            }

        };
    }
}

并且在靠近可观察源的地方使用它,因为其他运算符可能会在此之前急切地取消订阅。

Observerable.create(connectToUnboundedStream()).lift(new OperatorSuppressError(log()).doOnNext(someStuff()).subscribe();

请注意,这会抑制源代码的错误传递。如果链中的任何onNext在它之后抛出异常,则仍有可能取消订阅源。

1
这个代码可以工作,但是在错误被抑制之后,我的源可观察对象似乎停止工作了。有没有一种方法可以重新启动它? - Matthias
@Matthias,这个方法不能解决问题吗?(和onErrorResumeNext一样 - 它会完成可观察对象)? - User

2
这个答案可能有点晚,但如果有人遇到了这个问题,可以使用Jacke Wharton准备好的Relay库,而不是重新发明轮子。

https://github.com/JakeWharton/RxRelay

有很好的文档,但实质上,Relay是一个主题,但没有调用onComplete或onError的能力。
选项包括:
BehaviorRelay
Relay that emits the most recent item it has observed and all subsequent observed items to each subscribed Observer.

    // observer will receive all events.
    BehaviorRelay<Object> relay = BehaviorRelay.createDefault("default");
    relay.subscribe(observer);
    relay.accept("one");
    relay.accept("two");
    relay.accept("three");

    // observer will receive the "one", "two" and "three" events, but not "zero"
    BehaviorRelay<Object> relay = BehaviorRelay.createDefault("default");
    relay.accept("zero");
    relay.accept("one");
    relay.subscribe(observer);
    relay.accept("two");
    relay.accept("three");

PublishRelay是一个Relay,一旦Observer订阅了它,就会将所有随后观察到的项目发射给订阅者。

    PublishRelay<Object> relay = PublishRelay.create();
    // observer1 will receive all events
    relay.subscribe(observer1);
    relay.accept("one");
    relay.accept("two");
    // observer2 will only receive "three"
    relay.subscribe(observer2);
    relay.accept("three");

ReplayRelay是一个缓存所有观察到的项目并将它们重新发送给任何订阅者的中继器。
    ReplayRelay<Object> relay = ReplayRelay.create();
    relay.accept("one");
    relay.accept("two");
    relay.accept("three");
    // both of the following will get the events from above
    relay.subscribe(observer1);
    relay.subscribe(observer2);

这应该是被接受的答案。Repay对于事件总线类型的广播非常方便。 - X.Y.

1
尝试在Observable.defer调用中调用REST服务。这样,每次调用时都可以使用自己的'onErrorResumeNext',错误不会导致主流程完成。
reactiveLocationProvider.getUpdatedLocation(mLocationRequest)
  .buffer(50)
  .flatMap(locations ->
    Observable.defer(() -> mRestService.postLocations(locations))
      .onErrorResumeNext(<SOME_DEFAULT_TO_REACT_TO>)
  )
........

那个解决方案最开始来自于这个帖子-> RxJava Observable和Subscriber在跳过异常方面的应用?,但我认为它在你的情况下也可以使用。

1
添加我的解决方案:
privider
    .compose(ignoreErrorsTransformer)
    .subscribe()

private final Observable.Transformer<ResultType, ResultType> ignoreErrorsTransformer =
        new Observable.Transformer<ResultType, ResultType>() {
            @Override
            public Observable<ResultType> call(Observable<ResultType> resultTypeObservable) {
                return resultTypeObservable
                        .materialize()
                        .filter(new Func1<Notification<ResultType>, Boolean>() {
                            @Override
                            public Boolean call(Notification<ResultType> resultTypeNotification) {
                                return !resultTypeNotification.isOnError();
                            }
                        })
                        .dematerialize();

            }
        };

“materialize”运算符是什么意思? - Sergio Sánchez Sánchez

0
一个稍微修改过的解决方案(@MikeN)以使有限流能够完成:
import rx.Observable.Operator;
import rx.functions.Action1;

public final class OperatorSuppressError<T> implements Operator<T, T> {
    final Action1<Throwable> onError;

    public OperatorSuppressError(Action1<Throwable> onError) {
        this.onError = onError;
    }

    @Override
    public Subscriber<? super T> call(final Subscriber<? super T> t1) {
        return new Subscriber<T>(t1) {

            @Override
            public void onNext(T t) {
                t1.onNext(t);
            }

            @Override
            public void onError(Throwable e) {
                onError.call(e);
                //this will allow finite streams to complete
                t1.onCompleted();
            }

            @Override
            public void onCompleted() {
                t1.onCompleted();
            }

        };
    }
}

0

你可以使用onErrorComplete()方法来跳过错误

mSubscription = reactiveLocationProvider.getUpdatedLocation(mLocationRequest)
    .buffer(50)
    .flatMapMaybe(locations -> Maybe.just(mRestService.postLocations(locations).onErrorComplete()) // skip item
    .subscribeOn(Schedulers.newThread())
    .observeOn(AndroidSchedulers.mainThread())
    .subscribe();

0
使用Rxjava2,我们可以调用带有delayErrors参数的flatmap方法: flatmap javadoc 当将其设置为true时:

来自当前Flowable和所有内部发布者的异常会被延迟直到它们全部终止;如果设置为false,则第一个信号异常的发布者将立即终止整个序列。


网页内容由stack overflow 提供, 点击上面的
可以查看英文原文,
原文链接