如何在RxJava/RxAndroid中重复多次相同的网络请求(使用不同的参数)?

5
所以,我有一个名为/download的API,它会根据自己的参数返回一个通用的Object(基于索引号),然后我必须将其保存到我的数据库中。如果事务成功,我必须增加我的索引并重复相同的过程,否则就要retry()
我需要重复这个过程大约50次。
如何使用Rx-Java实现这个过程?我现在卡住了。任何帮助都会很棒。谢谢。

我认为更可能面临的是HTTP错误而不是事务错误。因此,HTTP层需要更大的重试机制需求。 - WindRider
3个回答

5
Observable.range(1, 50)
    .flatMap(index ->      // for every index make new request
        makeRequest(index) // this shall return Observable<Response>
            .retry(N)      // on error => retry this request N times
    )
    .subscribe(response -> saveToDb(response));

回复评论(仅在上一条回复已保存到数据库后才进行新的请求):

Observable.range(1, 50)
    .flatMap(index ->      // for every index make new request
        makeRequest(index) // this shall return Observable<Response>
            .retry(N)      // on error => retry this request N times
            .map(response -> saveToDb(response)), // save and report success
        1                  // limit concurrency to single request-save
    )
    .subscribe();

我尝试了你的答案,似乎立即调用了另一个API。有没有办法等待结果,将其保存到数据库中,然后仅在此之后再调用新的API? - Sanjay Gubaju
为了实现这一点,您可以将保存操作移动到请求管道中,并将 flatMap 并发限制为 1。 - Yaroslav Stavnichiy

2

如果我理解正确,这段代码应该会指引你朝着正确的方向前进。

        BehaviorSubject<Integer> indexes = BehaviorSubject.createDefault(0);
        indexes.flatMap(integer -> Observable.just(integer)) // download operation
               .flatMap(downloadedObject -> Observable.just(integer)) // save operation
               .doOnNext(ind -> indexes.onNext(ind + 1))
               .subscribe(ind -> System.out.println("Index " + ind));

发生的情况是:

  1. BehaviorSubject是整个工作的一种启动器,它向链中提供索引。
  2. 第一个flatMap是您执行下载操作的地方
  3. 第二个flatMap是您将其保存到DB的地方
  4. 在doOnNext中,您必须发出onNext或onComplete以继续或完成处理。(可以在订阅者中完成此操作)

记得在onNext中添加停止条件,以避免陷入无限循环。


那么我该如何将从第一个flatMap(API)接收到的值传递给我的第二个flatMap(db)呢? - Sanjay Gubaju
1
它会自动传递,在这里我将这些变量命名为整数,但实际上第一个flatMap返回对象即User,它成为第二个flatMap的输入。 - MatBos

0
我需要重复这个大约50次。
您可以使用“范围”运算符并处理每个发出的Int。
如果交易成功,我必须增加我的索引。
在这种情况下,您需要使用concatMap操作符。它会按顺序处理每个Observable。
Observable<Response> makeRequest(int i) {...}

Completable saveToDb(Response response) {...}

Observable.range(1, 50)
    .concatMap(i -> makeRequest(i)
                        //I assume that you save your response to DB asynchronously
                        //if not - use doOnNext operator instead of flatMapCompletable
                        .flatMapCompletable(response -> saveToDb(response)
                        .toObservable())
    .retry()
    ...

我遇到了与 @Yaroslav 回答中相似的问题。makeRequest() 立即被调用,但它只应在数据库成功保存后才被调用。 - Sanjay Gubaju
concatMap 等待每个 ObservableonComplete。在我的例子中,它是包含 makeRequestsaveToDb 的组合 Observable。你的实现似乎有问题。不过我已经编辑了我的答案以修复代码格式。请发布你的代码。 - Maksim Ostrovidov

网页内容由stack overflow 提供, 点击上面的
可以查看英文原文,
原文链接