RxJava中的重试缓冲区

3
一个热Observable会发出项目,我想将这些项目上传到服务器。 有两个考虑因素:
  1. 由于io操作的开销,我想将这些项目作为数组批处理并上传
  2. 由于io操作的不可靠性,我希望批处理的上传失败会被前置到下一个批次中。
Uploads succeed:
1 - 2 - 3 - 4 - 5
------------------
u(1,2,3) - u(4,5)

First upload fails:
1 - 2 - 3 - 4 - 5
------------------
u(1,2,3) - u(1,2,3,4,5)

我可以使用 buffer 操作符来实现第一个要求,但不知道如何满足第二个要求。

1个回答

2

这是我关于将失败情况存储在队列中的想法

最初的回答:

public class StackOverflow {

    public static void main(String[] args) {
        // store any failures that may have occurred
        LinkedBlockingQueue<String> failures = new LinkedBlockingQueue<>();

        toUpload()
                // buffer however you want
                .buffer(5)
                // here is the interesting part
                .flatMap(strings -> {
                    // add any previous failures
                    List<String> prevFailures = new ArrayList<>();
                    failures.drainTo(prevFailures);
                    strings.addAll(prevFailures);

                    return Flowable.just(strings);
                })
                .flatMapCompletable(strings -> {
                    // upload the data
                    return upload(strings).doOnError(throwable -> {
                        // if its an upload failure:
                        failures.addAll(strings);
                    });
                }).subscribe();
    }

    // whatever your source flowable is
    private static Flowable<String> toUpload() {
        return Flowable.fromIterable(Arrays.asList("a", "b", "c", "d", "e", "f", "g", "h", "i"));
    }

    // some upload operation
    private static Completable upload(List<String> strings) {
        return Completable.complete();
    }
}

这里有一些边缘情况需要注意,如果最后一个可流式化的缓存组失败了,它不会重试。通过retryWhen运算符可能可以实现这一点,但基本思想仍然是使用队列。


这可能可行,但我不会接受它作为答案,因为它违反了关于Observables和组合的许多理解。 - f.khantsis
“violates” 是什么意思?如果您在寻找一个仅适用于 Rx 的答案,我想不到一个。 - William Reed

网页内容由stack overflow 提供, 点击上面的
可以查看英文原文,
原文链接