假设我有一个水果来源,我想将它们的数量插入到数据库中。
我可以像这样做:
我可以像这样做:
Flow[Fruits]
.map { item =>
insertItemToDatabase(item)
}
但是这显然很慢——为什么要为每个项目插入数据库,当我可以将它们分组?因此,我想出了一个更好的解决方案:
Flow[Fruits]
.grouped(10000)
.map { items =>
insertItemsToDatabase(items)
}
但这意味着我必须在内存中保存10000个元素[香蕉,橙子,橙子,橙子,香蕉,...]
直到它们被刷新到数据库。这不是很低效吗?也许我可以这样做:
Flow[Fruits]
.grouped(100)
.map { items =>
consolidate(items) // this will return Map[String, Int]
}
.grouped(100)
// here I have Seq[Map[String, Int]]
.map { mapOfItems=>
insertMapToDatabase(mapOfItems)
}
据我理解,这种方法应该也可以一次处理10000个元素,但是如果元素经常重复,则不会占用太多内存。但是每个键仍然在内存中重复100次。我可以使用.grouped(10).map().grouped(10).map().grouped(10).map().grouped(10).map()
的方法... 但有没有更好的方法呢?也许可以使用以下方式:
Flow[Fruits]
.map { item =>
addToMap(item)
if(myMap.length == 10000) {
insertToDatabase(myMap)
clearMyMap()
}
}
但是这是否会破坏Akka流的概念,即处理阶段的独立性(因此并发性)?