Spark:如何将List <RDD>联合到RDD?

8

我对Spark和Scala语言很陌生,希望将以下(List<RDD> to RDD)列表中的所有RDD进行合并:

 val data = for (item <- paths) yield {
        val ad_data_path = item._1
        val ad_data = SparkCommon.sc.textFile(ad_data_path).map {
            line => {
                val ad_data = new AdData(line)
                (ad_data.ad_id, ad_data)
            }
        }.distinct()
    }
 val ret = SparkCommon.sc.parallelize(data).reduce(_ ++ _)

我在IntelliJ中运行代码时,总是出现以下错误:

ava.lang.NullPointerException
at org.apache.spark.rdd.RDD.<init>(RDD.scala:125)
at org.apache.spark.rdd.UnionRDD.<init>(UnionRDD.scala:59)
at org.apache.spark.rdd.RDD.union(RDD.scala:438)
at org.apache.spark.rdd.RDD.$plus$plus(RDD.scala:444)
at data.GenerateData$$anonfun$load_data$1.apply(GenerateData.scala:99)
at data.GenerateData$$anonfun$load_data$1.apply(GenerateData.scala:99)
at scala.collection.TraversableOnce$$anonfun$reduceLeft$1.apply(TraversableOnce.scala:177)
at scala.collection.TraversableOnce$$anonfun$reduceLeft$1.apply(TraversableOnce.scala:172)
at scala.collection.Iterator$class.foreach(Iterator.scala:727)
at org.apache.spark.InterruptibleIterator.foreach(InterruptibleIterator.scala:28)
at scala.collection.TraversableOnce$class.reduceLeft(TraversableOnce.scala:172)
at org.apache.spark.InterruptibleIterator.reduceLeft(InterruptibleIterator.scala:28)
at org.apache.spark.rdd.RDD$$anonfun$18.apply(RDD.scala:847)
at org.apache.spark.rdd.RDD$$anonfun$18.apply(RDD.scala:845)
at org.apache.spark.SparkContext$$anonfun$26.apply(SparkContext.scala:1157)
at org.apache.spark.SparkContext$$anonfun$26.apply(SparkContext.scala:1157)
at org.apache.spark.scheduler.ResultTask.runTask(ResultTask.scala:62)
at org.apache.spark.scheduler.Task.run(Task.scala:54)
at org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:177)
at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1145)
at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:615)
at java.lang.Thread.run(Thread.java:745)

有人对这个错误有任何想法吗?提前感谢您 :)
2个回答

19

这可能是引起问题的原因,

val listA = 1 to 10
for(i <- listA; if i%2 == 0)yield {i}

将返回Vector(2,4,6,8,10),而

for(i <- listA; if i%2 == 0)yield {val c = i}

将返回 Vector((), (), (), (), ())。

这正是您的情况。 您正在初始化 ad_data 但未将其返回给 yield。

就您的问题而言,即List[RDD]转换为RDD

以下是解决方案:

val listA = sc.parallelize(1 to 10)
val listB = sc.parallelize(10 to 1 by -1)

创建 2 个RDDS的列表

val listC = List(listA,listB)

List[RDD] 转换为 RDD

val listD = listC.reduce(_ union _)
希望这回答了您的问题。

非常感谢,你的解决方案解决了我的问题。 - juffun
@juffun,如果解决方案对您有用,请接受答案 :) - Akash

0

将RDD列表转换为RDD的另一种简单方法。 SparkContext有两个重载的union方法,一个接受两个RDD,另一个接受RDD列表。

union(first, rest) union(rdds: Seq[RDD[T]]))


网页内容由stack overflow 提供, 点击上面的
可以查看英文原文,
原文链接