Scala / Akka Streams中元素的分组

4
假设我有一个水果来源,我想将它们的数量插入到数据库中。
我可以像这样做:
Flow[Fruits]
.map { item =>
    insertItemToDatabase(item)
}

但是这显然很慢——为什么要为每个项目插入数据库,当我可以将它们分组?因此,我想出了一个更好的解决方案:

Flow[Fruits]
.grouped(10000)
.map { items =>
    insertItemsToDatabase(items)
}

但这意味着我必须在内存中保存10000个元素[香蕉,橙子,橙子,橙子,香蕉,...]直到它们被刷新到数据库。这不是很低效吗?也许我可以这样做:

Flow[Fruits]
.grouped(100)
.map { items =>
    consolidate(items)  // this will return Map[String, Int]
}
.grouped(100)
// here I have Seq[Map[String, Int]]
.map { mapOfItems=>
    insertMapToDatabase(mapOfItems)
}

据我理解,这种方法应该也可以一次处理10000个元素,但是如果元素经常重复,则不会占用太多内存。但是每个键仍然在内存中重复100次。我可以使用.grouped(10).map().grouped(10).map().grouped(10).map().grouped(10).map()的方法... 但有没有更好的方法呢?也许可以使用以下方式:

Flow[Fruits]
.map { item =>
    addToMap(item)
    if(myMap.length == 10000) {
        insertToDatabase(myMap)
        clearMyMap()
    }
}

但是这是否会破坏Akka流的概念,即处理阶段的独立性(因此并发性)?


请看函数“groupedWithin”。它有两个参数:元素的最大限制和时间速率。例如,如果在1秒之前达到5000个元素,则“.groupedWithin(5000,1.seconds)”将提供待处理的5000个元素;否则就会积累在1秒钟内获得的元素数量。 - alifirat
感谢@alifirat的建议,但那只是一种不同的分组方式。我需要的是一种不同的处理数据的方式,既友好于内存又友好于数据库。 - Honza Zíka
1个回答

2
如果“水果”集合的基数很低,那么您可以保留一个带有所有计数的单一映射,然后在流式处理所有“水果”值后将其刷新到数据库中。
首先,构造一个会保持运行计数的流:
type Count = Int

type FruitCount = Map[Fruit, Count]

val zeroCount : FruitCount = 
  Map.empty[Fruit, Count] withDefaultValue 0

val appendFruitToCount : (FruitCount, Fruit) => FruitCount = 
  (fruitCount, fruit) => fruitCount + (fruit -> fruitCount(fruit) + 1)

val fruitCountFlow : Flow[Fruit, FruitCount, NotUsed] =
  Flow[Fruit].scan(zeroCount)(appendFruitToCount)

现在创建一个接收最后一个 FruitCount 并实现流化的 Sink:
val lastFruitCountSink : Sink[FruitCount, _] = Sink.lastOption[FruitCount]

val fruitSource : Source[Fruit, NotUsed] = ???

val lastFruitCountFut : Future[Option[FruitCount]] = 
  fruitSource
    .via(fruitCountFlow)
    .to(lastFruitCountSink)
    .run()

然后可以使用lastFruitCountFut向数据库发送值:

lastFruitCountFut foreach (_ foreach (_ foreach { (fruit, count) =>
  insertItemsToDatabase( Iterator.fill(count)(fruit) )
}))

使用Iterator是因为它是构建水果项的TraversableOnce集合最节省内存的方式。

这种解决方案只会在内存中保留一个Map,该Map将为每种不同的水果类型和每个键保留1个整数。


网页内容由stack overflow 提供, 点击上面的
可以查看英文原文,
原文链接