RxJava重试在Observer中不起作用

3

这是一个非常简单的示例:

public static void main(String[] args) {
    Observable.from(ListUtils.asList(1, 2, 3)).retry(3).subscribe(new Observer<Integer>() {
        @Override
        public void onCompleted() {
            System.out.println("onCompleted");
        }

        @Override
        public void onError(Throwable e) {
            System.out.println("onError");
        }

        @Override
        public void onNext(Integer integer) {
            System.out.println(integer);
            if (integer == 3)
                throw new RuntimeException("onNext exception");

        }
    });
}

控制台输出为:1,2,3,onError。 但我的期望是:1,2,3,1,2,3,1,2,3,onError。
2个回答

4
一旦出现错误,订阅者将取消订阅可观察对象。如果使用retry运算符,则只有在该运算符未在主管道中使用而是在flatMap运算符中使用时才会重试。
由于这个retry是在一个flatMap之后,所以它会起作用。
@Test
public void retryInFlatMap() {
    Observable.from(Arrays.asList(1, 2, 3, 4))
            .flatMap(number -> Observable.just(number)
                    .doOnNext(n -> {
                        if (n == 2) {
                            throw new NullPointerException();
                        }
                    }))
                    .retry(3)
            .subscribe(n-> System.out.println("number:"+n));
}

这句话的意思是:由于在地图后面,这个可能不会。
int cont=0;
@Test
public void retryInMap() {
    Observable.from(Arrays.asList(1, 2, 3, 4))
            .map(number ->{
                        if (cont == 2) {
                            throw new NullPointerException();
                        }
                        cont++;
                        return number;
                    })
            .retry(3)
            .subscribe(n-> System.out.println("number:"+n));
}

如果您想查看更多示例,请查看这里:https://github.com/politrons/reactive

-1

retry()操作符在可观察对象发出错误时,重新订阅并执行上述过程。请参考this

我还编写了示例代码。

Observable.from(Arrays.asList(1, 2, 3, 4))
    .flatMap(new Func1<Integer, Observable<Integer>>() {
        @Override public Observable<Integer> call(Integer integer) {
            if(integer == 4) {
                return Observable.error(new Exception());
            }
            return Observable.just(integer);
        }
    })
    // When error occurred, re-subscribe and execute above process
    // If error occurred over 2 times, Observer.onError occurred
    .retry(2)
    .subscribe(new Observer<Integer>() {
        @Override public void onCompleted() {
            System.out.println("onCompleted");
        }

        @Override public void onError(Throwable e) {
            System.out.println("onError");
        }

        @Override public void onNext(Integer integer) {
            System.out.println(integer);
        }
    });

网页内容由stack overflow 提供, 点击上面的
可以查看英文原文,
原文链接