RxJava - 如何将scan和flatMap结合起来使用?

10

假设我有一个函数,它接受一个String和一个long参数,并返回一个Single<String>

Single<String> stringAddition(String someString, long value) {
  return Single.just(someString + Long.toString(value));
}

现在我有这个Observable...

Observable.interval(1, SECONDS)
  .scan("", (cumulativeString, item) -> {
    // Need to return the result of stringAddition(cummulativeString, item)
  });

我不知道该怎么做。 Scan 要求我返回一个 String,但是我想使用返回 Single<String> 的方法。对我来说,似乎需要一些可以结合两个操作 scanflatMap 的东西。是否有任何 RxJava2 技巧可以帮助我?


为什么你一开始就返回Single<String>呢? - akarnokd
1
我试图想出一个简单的例子来解释一个在上下文中更为复杂的问题。在我的当前项目中,它实际上是一个 Single<Response> 的 Web 服务请求。 - Chris Horner
1
我现在意识到我的示例实际上太简单了,无法正确地演示我试图解决的问题。如果我使用Single<Response>作为累积值进行扫描,那么实际上每个请求都会执行两次:( - Chris Horner
1个回答

5
你可以按照以下步骤实现。如果 stringAddition 返回了 Observable,则可以将其缩短一些。
Observable<String> scanned = Observable.interval(1, TimeUnit.SECONDS)
            .scan(
                    Observable.just(""),
                    (cumulativeString, item) ->
                        cumulativeString
                          .flatMap(str -> stringAddition(str, item).toObservable())
            )
            .flatMap(it -> it);

1
这个答案确实适用于我的简单示例。但我认为这个答案最终会订阅每个Single<String>两次;一次在扫描内部的flatMap中,另一次在外部的flatMap中。你能想到任何不需要两次订阅Single<String>的解决方案吗? - Chris Horner
我的意思是,我猜我可以在scan生成的Observable上使用.cache(),那应该可以工作,但我觉得应该有更简洁的方法! - Chris Horner
这有点晚了,但我认为你可以使用replay(1)而不是.cache()来获得所需的效果。在C#中还有另一种方法,但我不确定它是否适用于Java。您可以返回一个Task而不是可观察对象,并在扫描时等待累积值,然后返回新值。任务缓存其值并在等待时返回它。 - NickL

网页内容由stack overflow 提供, 点击上面的
可以查看英文原文,
原文链接