Scala Observable: 将Observable与Sequence统一,无需中间数据结构更新。

9

我有一段调用couchbase获取某些行的代码,如下所示:

val gotValues: Observable[JsonDocument] = Observable.from(rowKeys).flatMap(id =>
      couchbaseBucket.async().get(id))

如果我输入1、2、3、4、5、6作为行键,但只有1、2、3行存在于数据库中,则observable仅会通知1、2、3。
然而,我的要求是返回一个映射,其中1、2、3为true(存在于数据库中),4、5、6为false(意味着不存在于数据库中)。我成功地使用Scala observable实现了这一点,但我使用了一个中间的映射数据结构来返回包含所有ID的总映射。下面是一个模拟我的问题的示例代码。
object Main extends App {
  import rx.lang.scala.Observable

  val idsToFetch = Seq(1,2,3,4,5,6)

  println(isInDBOrNot()) // {1=true, 2=true, 3=true, 4=false, 5=false, 6=false}

  private def isInDBOrNot(): ConcurrentHashMap[Int, Boolean] = {
    val inAndNotInDB = new java.util.concurrent.ConcurrentHashMap[Int, Boolean]
    // - How can I avoid the additional data structure?
    // - In this case a map, so that the function will return
    //   a map with all numbers and for each if exist in DB?
    // - I mean I want the function to return a map I don't 
    //   want to populate that map inside the observer,
    //   it's like a mini side effect I would rather simply 
    //   manipulate the stream.

    Observable.from(idsToFetch)
      .filterNot(x => x == 4 || x == 5 || x == 6) // Simulate fetch from DB, 4,5,6 do not exist in DB, so not returned.
      .subscribe(
      x => inAndNotInDB.put(x, true),
      e => println(e),
      () => idsToFetch.filterNot(inAndNotInDB.containsKey)
        .foreach(inAndNotInDB.put(_, false)) // mark all non-found as false.
    )

    inAndNotInDB
  }

}

有没有不需要中间映射(不需要填充中间数据结构,而只是通过操作流来完成)的方法呢?看起来不太干净啊!!谢谢。

3个回答

4
您的问题似乎源于您使用了flatMap,如果对于给定的id没有数据在DB中,并且您获得了一个空的Observable,那么flatMap将不会为这样的id产生任何输出。因此,看起来您需要的是defaultIfEmpty,这被翻译成Scala的orElse。您可以使用orElseflatMap内返回一些默认值。所以修改您的示例代码如下:
def fetchFromDb(id: Int): Observable[String] = {
  if (id <= 3)
    Observable.just(s"Document #$id")
  else
    Observable.empty
}

def gotValue(idsToFetch: Seq[Int]): Observable[(Int, Boolean)] = {
  Observable.from(idsToFetch).flatMap((id: Int) => fetchFromDb(id).map(_ => (id, true)).orElse((id, false)))
}

println(gotValue(Seq(1, 2, 3, 4, 5, 6)).toBlocking.toList)

需要打印的内容为

List((1,true), (2,true), (3,true), (4,false), (5,false), (6,false))

或者您可以使用Option返回Some(JsonDocument)None,例如

def gotValueEx(idsToFetch: Seq[Int]): Observable[(Int, Option[String])] = {
  Observable.from(idsToFetch).flatMap((id: Int) => fetchFromDb(id).map(doc => (id, Option(doc))).orElse((id, None)))
}

println(gotValueEx(Seq(1, 2, 3, 4, 5, 6)).toBlocking.toList)

打印

列表((1,某些(Document #1)), (2,某些(Document #2)), (3,某些(Document #3)), (4,无), (5,无), (6,无))


1

做到这一点的方法如下:

(1) 将 ID 序列转换为 Observable 并使用 map 进行映射。

id => (id, false)

...所以你将得到一个类型为Observable[(Int, Boolean)]的可观察对象(我们称之为新的可观察对象first)。

(2)从数据库中获取数据并将每个获取的行映射为:

(some_id, true)

...在Observable[(Int, Boolean)]内部(我们称这个observable为last

(3) 连接firstlast

(4) (3)的结果转换为Map。来自first的重复元素将在此过程中被删除。(这将是您的resultObsrvable

(5) (可能)收集可观察对象(您的映射)的第一个和唯一元素。您可能根本不想这样做,但如果您确实想这样做,那么您应该真正了解在此时阻塞以收集结果的影响。无论如何,此步骤确实取决于您的应用程序具体情况(线程/调度/IO的组织方式),但蛮力方法应该类似于以下内容(有关更多详细信息,请参见this demo):

Await.result(resultObsrvable.toBlocking.toFuture, 2 seconds)

1
这个怎么样?
Observable.from(idsToFetch)
        .filterNot(x => x._1 == 4 || x._1 == 5 || x._1 == 6)
        .foldLeft(idToFetch.map{_->false}.toMap){(m,id)=>m+(id->true)}

网页内容由stack overflow 提供, 点击上面的
可以查看英文原文,
原文链接