我是Apache Spark的初学者。我想从一个RDD中过滤掉所有重量总和大于常数值的组。 "weight"映射也是一个RDD。这里有一个小型演示,要被过滤的组存储在“groups”中,常数值为12:
val groups = sc.parallelize(List("a,b,c,d", "b,c,e", "a,c,d", "e,g"))
val weights = sc.parallelize(Array(("a", 3), ("b", 2), ("c", 5), ("d", 1), ("e", 9), ("f", 4), ("g", 6)))
val wm = weights.toArray.toMap
def isheavy(inp: String): Boolean = {
val allw = inp.split(",").map(wm(_)).sum
allw > 12
}
val result = groups.filter(isheavy)
当输入的数据非常大时,例如大于10GB,我经常会遇到“java heap out of memory”错误。我怀疑这是由于“weights.toArray.toMap”引起的,因为它将一个分布式RDD转换为JVM中的Java对象。所以我尝试直接使用RDD进行过滤:
val groups = sc.parallelize(List("a,b,c,d", "b,c,e", "a,c,d", "e,g"))
val weights = sc.parallelize(Array(("a", 3), ("b", 2), ("c", 5), ("d", 1), ("e", 9), ("f", 4), ("g", 6)))
def isheavy(inp: String): Boolean = {
val items = inp.split(",")
val wm = items.map(x => weights.filter(_._1 == x).first._2)
wm.sum > 12
}
val result = groups.filter(isheavy)
当我在将此脚本加载到Spark Shell中后运行
result.collect
时,我会得到一个"java.lang.NullPointerException"错误。有人告诉我,当RDD在另一个RDD中被操作时,会出现空指针异常,并建议我将权重放入Redis。那么,我如何在不将"weight"转换为Map或将其放入Redis的情况下获取"result"? 如果有一种基于类似Map的RDD筛选另一个RDD的解决方案而不需要外部数据存储服务的帮助呢?
谢谢!