如何从RDD创建RDD集合?

6
我有一个类型为RDD[String]wordRDD。我还有一个可以从字符串/单词创建RDD[String]的函数。我想为wordRDD中的每个字符串创建一个新的RDD。以下是我的尝试:

1)失败了,因为Spark不支持嵌套RDD:

var newRDD = wordRDD.map( word => {
  // execute myFunction()
  (new MyClass(word)).myFunction()
})

2) 失败(可能是由于作用域问题导致的):

var newRDD = sc.parallelize(new Array[String](0))
val wordArray = wordRDD.collect
for (w <- wordArray){
  newRDD = sc.union(newRDD,(new MyClass(w)).myFunction())
}

我的理想结果应该是:

// input RDD (wordRDD)
wordRDD: org.apache.spark.rdd.RDD[String] = ('apple','banana','orange'...)

// myFunction behavior
new MyClass('apple').myFunction(): RDD[String] = ('pple','aple'...'appl')

// after executing myFunction() on each word in wordRDD:
newRDD: RDD[String] = ('pple','aple',...,'anana','bnana','baana',...)

我在这里找到了一个相关的问题:Spark当合并大量RDD时会抛出堆栈溢出错误,但它没有解决我的问题。

2个回答

3
您无法在另一个RDD中创建RDD。
但是,可以将生成输入中删除一个字母的所有单词的函数myFunction:String => RDD [String]重写为另一个函数modifiedFunction:String => Seq [String],以便可以从RDD内部使用它。这样,它也将在集群上并行执行。有了modifiedFunction,您只需调用wordRDD.flatMap(modifiedFunction)即可获得所有单词的最终RDD。
关键点是使用flatMap(对转换进行map和flatten):
def main(args: Array[String]) {
  val sparkConf = new SparkConf().setAppName("Test").setMaster("local[*]")
  val sc = new SparkContext(sparkConf)

  val input = sc.parallelize(Seq("apple", "ananas", "banana"))

  // RDD("pple", "aple", ..., "nanas", ..., "anana", "bnana", ...)
  val result = input.flatMap(modifiedFunction) 
}

def modifiedFunction(word: String): Seq[String] = {
  word.indices map {
    index => word.substring(0, index) + word.substring(index+1)
  }
}

3
使用flatMap以获取所需的RDD[String]
var allWords = wordRDD.flatMap { word => 
  (new MyClass(word)).myFunction().collect()
}

2
这个程序应该如何并行运行?在wordRDD.map中发生的所有事情都在集群上执行。因此,内部的collect必须从正在运行的作业中触发一个新的Spark作业。我怀疑它不会分布式地运行。 - Till Rohrmann
他还可以修改函数,使其返回数组而不是RDD,但问题并没有指定实际的函数。 - Climbs_lika_Spyder
但是他的描述说他有一个函数,我猜测它是myFunction,可以从一个字符串/单词创建一个RDD[String] - Till Rohrmann
是的,确实如此。你的回答告诉他将 myFunction 更改为返回不同的内容。不知道这个函数有多复杂,很难说其中的计算是否分布式的。如果收集数据集意味着所有之前完成的计算都不再分布式,那么就没有任何东西是分布式的了。 - Climbs_lika_Spyder
你有没有尝试运行你的代码?从另一个RDD中调用collect基本上是不可能的。你认为这应该如何实现?collect方法将触发作业执行并将结果作为Seq返回到驱动节点。但flatMap操作是作业的一部分,是分布式执行的。那么collect方法应该如何执行呢?此外,生成新的RDDs所必需的SparkContext根本无法序列化。因此,无法将其与您的UDF一起发送。 - Till Rohrmann
1
我不知道你所说的“ship it”是什么意思,但我已经运行了我最初发布的代码。它可以正常工作。@JacekLaskowski让我的代码更加紧凑,但我认为它仍然有效。 - Climbs_lika_Spyder

网页内容由stack overflow 提供, 点击上面的
可以查看英文原文,
原文链接