RxJava:使用重试限制的retryWhen

10

我对ReactiveX和反应式编程都很陌生。我需要为Couchbase CAS操作实现一个重试机制,但是Couchbase网站上的示例显示了一个retryWhen,看起来会无限重试。我需要在其中设置重试限制和重试计数。

简单的retry()可以工作,因为它接受一个retryLimit,但我不想让它在每个异常发生时都重试,只想在出现CASMismatchException时重试。

有什么建议吗?我正在使用RxJava库。

3个回答

9
除了Simon Basle所说的内容外,这里有一个带有线性退避的快速版本:
.retryWhen(notification ->
    notification
    .zipWith(Observable.range(1, 5), Tuple::create)
    .flatMap(att ->
            att.value2() == 3 ? Observable.error(att.value1()) : Observable.timer(att.value2(), TimeUnit.SECONDS)
    )
)

请注意,“att”在此处是一个元组,包含可抛出对象和重试次数两个参数,因此您可以非常具体地根据这两个参数实现返回逻辑。
如果您想了解更多信息,可以查看我正在撰写的弹性文档:https://gist.github.com/daschl/db9fcc9d2b932115b679#retry-with-delay

有人能解释一下,为什么这里需要使用 flatMap 吗?似乎没有它就无法工作。 - deviant

6
retryWhen相对于简单的重试显然要复杂一些,但以下是它的要点:
- 你需要将一个notificationHandler函数传递给retryWhen,该函数接收一个Observable并输出一个Observable - 返回的Observable的发射决定了何时进行或停止重试 - 因此,对于原始流中出现的每个异常,如果处理程序返回1个项目,则会重试1次。如果它返回2个项目,那么就会重试2次...... - 只要处理程序的流发出错误,重试就会中止。
使用此方法,您可以同时:
- 只针对CasMismatchExceptions进行操作:在其他情况下,只需使您的函数返回Observable.error(t) - 仅针对特定次数重试:对于每个异常,请从表示最大重试次数的Observable.range中进行flatMap,让其使用重试号返回一个Observable.timer,如果需要增加延迟。

你的使用情境与 RxJava 文档 这里 中的情境非常接近。


我想在有计数限制的情况下使用重试。我尝试了Observable.range(0, 10),但它不起作用。 - Allen Vork

3

重新启动这个线程,因为在Couchbase Java SDK 2.1.2中有一种新的更简单的方法来实现这个功能:使用RetryBuilder

Observable<Something> retryingObservable =
sourceObservable.retryWhen(
  RetryBuilder
    //will limit to the relevant exception
    .anyOf(CASMismatchException.class)
    //will retry only 5 times
    .max(5)
    //delay doubling each time, from 100ms to 2s
    .delay(Delay.linear(TimeUnit.MILLISECONDS, 2000, 100, 2.0))
  .build()
);

你能否附加几个不同延迟的异常?并且你能为所有其余的异常设置默认延迟吗? - Roee Gavirel
不,你必须简单地链接多个retryWhen,每个都有自己的构建器,就像在命令式代码中链接多个catch块一样。 - Simon Baslé

网页内容由stack overflow 提供, 点击上面的
可以查看英文原文,
原文链接