RxJava + Retrofit 轮询

22

我的问题是我无法使用Retrofit获取无限流。在获取初始poll()请求的凭据后,我进行了初始poll()请求。如果没有更改,则每个poll()请求会在25秒后响应,或者如果有任何更改,则会更早地响应-返回changed_data[]。每个响应都包含下一个poll请求所需的timestamp数据-我应该在每个poll()响应后进行新的poll()请求。这是我的代码:

getServerApi().getLongPollServer() 
  .flatMap(longPollServer -> getLongPollServerApi(longPollServer.getServer()).poll("a_check", Config.LONG_POLLING_SERVER_TIMEOUT, 2, longPollServer.getKey(), longPollServer.getTs(), "") 
   .take(1) 
   .flatMap(longPollEnvelope -> getLongPollServerApi(longPollServer.getServer()).poll("a_check", Config.LONG_POLLING_SERVER_TIMEOUT, 2, longPollServer.getKey(), longPollEnvelope.getTs(), ""))) 
   .retry()
   .subscribe(longPollEnvelope1 -> {
   processUpdates(longPollEnvelope1.getUpdates());
});

我是RxJava的新手,可能有些地方还不太理解,但是我无法获取无限流。我只能获得3个调用,然后是onNext和onComplete。

另外,也许有更好的解决方案来实现Android上的长轮询吗?


在你的情况下,我会考虑使用Observable.create()来实现自己的“Observable”。 - Dmitry Zaytsev
1个回答

12

虽然不是理想的做法,但我认为你可以使用 RX 的副作用来实现所需的结果('doOn' 操作)。

Observable<CredentialsWithTimestamp> credentialsProvider = Observable.just(new CredentialsWithTimestamp("credentials", 1434873025320L)); // replace with your implementation

Observable<ServerResponse> o = credentialsProvider.flatMap(credentialsWithTimestamp -> {
    // side effect variable
    AtomicLong timestamp = new AtomicLong(credentialsWithTimestamp.timestamp); // computational steering (inc. initial value)
    return Observable.just(credentialsWithTimestamp.credentials) // same credentials are reused for each request - if invalid / onError, the later retry() will be called for new credentials
            .flatMap(credentials -> api.query("request", credentials, timestamp.get()))  // this will use the value from previous doOnNext
            .doOnNext(serverResponse -> timestamp.set(serverResponse.getTimestamp()))
            .repeat();
})
        .retry()
        .share();

private static class CredentialsWithTimestamp {

    public final String credentials;
    public final long timestamp; // I assume this is necessary for you from the first request

    public CredentialsWithTimestamp(String credentials, long timestamp) {
        this.credentials = credentials;
        this.timestamp = timestamp;
    }
}
当订阅'o'时,内部可观察对象将重复。如果出现错误,则'o'将重试并从凭据流重新发出请求。
在您的示例中,通过更新时间戳变量来实现计算引导,这对于下一个请求是必要的。

感谢您的回答。然而,我从 API 中获取了一个时间戳,我应该在使用新的 poll() 调用时将其发送回去。 - localhost
我已经更新了答案,希望更贴近你的情况。当你获得服务器响应时,可以看到你只是设置了一个变量。"doOnNext"使副作用显式化。我的担忧是这不太好看,我们需要看看你的代码才能给出更好的答案。 - snodnipper
我遇到了类似的问题,并使用您的代码解决了它,但在我的情况下,我想在第一次存储值时也进行存储。我应该把那段代码放在哪里? - Krutik
@Krutik - 你可以第二次订阅 "o" 并使用 first 运算符。如果需要进一步解释,请考虑创建另一个问题 - 默认值、错误条件和 share() 行为可能是需要考虑的因素。 - snodnipper

网页内容由stack overflow 提供, 点击上面的
可以查看英文原文,
原文链接