如何在Spark中根据另一个RDD的函数对RDD进行过滤?

3

我是Apache Spark的初学者。我想从一个RDD中过滤掉所有重量总和大于常数值的组。 "weight"映射也是一个RDD。这里有一个小型演示,要被过滤的组存储在“groups”中,常数值为12:

val groups = sc.parallelize(List("a,b,c,d", "b,c,e", "a,c,d", "e,g"))
val weights = sc.parallelize(Array(("a", 3), ("b", 2), ("c", 5), ("d", 1), ("e", 9), ("f", 4), ("g", 6)))
val wm = weights.toArray.toMap
def isheavy(inp: String): Boolean = {
  val allw = inp.split(",").map(wm(_)).sum
  allw > 12
}
val result = groups.filter(isheavy)

当输入的数据非常大时,例如大于10GB,我经常会遇到“java heap out of memory”错误。我怀疑这是由于“weights.toArray.toMap”引起的,因为它将一个分布式RDD转换为JVM中的Java对象。所以我尝试直接使用RDD进行过滤:

val groups = sc.parallelize(List("a,b,c,d", "b,c,e", "a,c,d", "e,g"))
val weights = sc.parallelize(Array(("a", 3), ("b", 2), ("c", 5), ("d", 1), ("e", 9), ("f", 4), ("g", 6)))
def isheavy(inp: String): Boolean = {
  val items = inp.split(",")
  val wm = items.map(x => weights.filter(_._1 == x).first._2)
  wm.sum > 12
}
val result = groups.filter(isheavy)

当我在将此脚本加载到Spark Shell中后运行result.collect时,我会得到一个"java.lang.NullPointerException"错误。有人告诉我,当RDD在另一个RDD中被操作时,会出现空指针异常,并建议我将权重放入Redis。
那么,我如何在不将"weight"转换为Map或将其放入Redis的情况下获取"result"? 如果有一种基于类似Map的RDD筛选另一个RDD的解决方案而不需要外部数据存储服务的帮助呢?
谢谢!
2个回答

4

假设您的组是唯一的。否则,请先通过distinct等方法使其唯一。 如果group或weights很小,则应该很容易。 如果group和weights都很大,您可以尝试使用以下方法,这可能更可扩展,但看起来也更复杂。

val groups = sc.parallelize(List("a,b,c,d", "b,c,e", "a,c,d", "e,g"))
val weights = sc.parallelize(Array(("a", 3), ("b", 2), ("c", 5), ("d", 1), ("e", 9), ("f", 4), ("g", 6)))
//map groups to be (a, (a,b,c,d)), (b, (a,b,c,d), (c, (a,b,c,d)....
val g1=groups.flatMap(s=>s.split(",").map(x=>(x, Seq(s))))
//j will be (a, ((a,b,c,d),3)...
val j = g1.join(weights)
//k will be ((a,b,c,d), 3), ((a,b,c,d),2) ...
val k = j.map(x=>(x._2._1, x._2._2))
//l will be ((a,b,c,d), (3,2,5,1))...
val l = k.groupByKey()
//filter by sum the 2nd
val m = l.filter(x=>{var sum = 0; x._2.foreach(a=> {sum=sum+a});sum > 12})
//we only need the original list
val result=m.map(x=>x._1)
//don't do this in real product, otherwise, all results go to driver.instead using saveAsTextFile, etc
scala> result.foreach(println)
List(e,g)
List(b,c,e)

2
“Java内存不足”错误之所以出现是因为Spark在确定分割数时使用其“spark.default.parallelism”属性,而默认情况下该属性为可用核心数。
// From CoarseGrainedSchedulerBackend.scala

override def defaultParallelism(): Int = {
   conf.getInt("spark.default.parallelism", math.max(totalCoreCount.get(), 2))
}

当输入变得很大,而内存有限时,您应该增加拆分的数量。

您可以按照以下方式操作:

 val input = List("a,b,c,d", "b,c,e", "a,c,d", "e,g") 
 val splitSize = 10000 // specify some number of elements that fit in memory.

 val numSplits = (input.size / splitSize) + 1 // has to be > 0.
 val groups = sc.parallelize(input, numSplits) // specify the # of splits.

 val weights = Array(("a", 3), ("b", 2), ("c", 5), ("d", 1), ("e", 9), ("f", 4), ("g", 6)).toMap

 def isHeavy(inp: String) = inp.split(",").map(weights(_)).sum > 12
 val result = groups.filter(isHeavy)

您也可以考虑使用 spark.executor.memory 来增加执行器内存大小。


网页内容由stack overflow 提供, 点击上面的
可以查看英文原文,
原文链接