RxJava无限重试新可观察对象。

5
我正在使用Rx-ified API for Vertx,这个问题涉及到一个潜在的无限重试直到成功的循环,我想实现它,但遇到了困难。我是RxJava新手。 以下是我的要求: 1.使用Vertx消息总线向另一个Vertx组件发送请求。 2.只要等待响应超时,就重新发出请求。 3.一旦收到请求响应,请检查结果,如果没有可用的内容,请等待一段时间,然后从第1步开始重复执行。
第一个问题: 我遇到的第一个问题是如何完成第2步。
如果您熟悉Vert.x Rx api,则可以理解在上述第1步中发出请求的含义。
vertx.eventBus().<JsonObject>sendObservable( "theAddress", aJsonObject );

上述代码返回一个Observable实例,该实例将发出响应或错误(例如超时)。该Observable永远不会再发出任何内容(否则每次订阅时它都会发出完全相同的内容,我不知道哪个是正确的)。
RxJava的重试运算符似乎无效。
vertx.eventBus().<JsonObject>sendObservable( "theAddress", aJsonObject )
     .retry()

我认为为了进行重试,我可以使用RxJava的retry()操作符,但我尝试过后发现这样做没有任何有用的效果,因为源可观测对象的性质。没有发送新的请求消息,因为唯一被“重试”的是对原始源的订阅,而原始源永远不会发出任何不同的东西。
RxJava的retryWhen操作符似乎无法正常工作
vertx.eventBus().<JsonObject>sendObservable( "theAddress", aJsonObject )
     .retryWhen( error -> {
        return _vertx.eventBus().<JsonObject>sendObservable( "theAddress", aJsonObject )
      })

于是我认为可以使用RxJava的retryWhen()操作符,当根observable发出错误时,它允许我发出第二个observable。我想,第二个observable只需是在步骤1中生成初始observer的相同代码。
但是,retryWhen()操作符(请参阅文档)不允许这个第二个observable发出错误而不以错误结束订阅。
所以,我在尝试在这个链的第一部分中设置一个潜在的无限重试循环时遇到了麻烦。
我肯定是错过了什么,但我还没有能够确定是什么。
第二个问题
vertx.eventBus().<JsonObject>sendObservable( "theAddress", aJsonObject )
     // imagine that retryWhen() accomplishes an infinite retry
     .retryWhen( error -> {
        return _vertx.eventBus().<JsonObject>sendObservable( ... )
      })
     .flatMap( response -> {
        // inspect response, if it has usable data,
        // return that data as an observable
        return Observable.from(response.data());

        // if the response has no usable data,
        // wait for some time, then start the whole process
        // all over again
        return Observable.timer(timeToWait).<WHAT GOES HERE?>;
     })

第二个问题是如何实现步骤3。对我来说,这似乎像第一个问题,只是更难理解,因为我不需要重试立即的源可观察对象,我需要等待一段时间,然后重新开始步骤1。
无论我创建什么Observable,似乎都需要链条中此时之前的所有元素,这似乎是一种应该避免的递归。在这一点上,任何帮助或建议都将非常受欢迎。

1
你订阅了retry()的结果吗? - akarnokd
是的。抱歉 - 示例中没有显示订阅调用。 - Hoobajoob
1个回答

4
非常感谢在RxJava Google Group上的Ben Christensen的指导,他指出defer()操作符将在每个订阅中生成一个新的Observable。这可以与标准的retry()操作符组合使用以获得无限重试。
因此,对于我问题中第一个问题,最简单的解决方案如下:
Observable.defer( () -> vertx.eventBus().<JsonObject>sendObservable( "theAddress", aJsonObject ) )
.retry()

如果需要指数退避,您可以在提供给defer()操作符的工厂方法中添加带有适当参数的Observable.timer()。

我仍在研究第二个问题。


网页内容由stack overflow 提供, 点击上面的
可以查看英文原文,
原文链接