我有一个类型为
RDD[String]
的wordRDD
。我还有一个可以从字符串/单词创建RDD[String]
的函数。我想为wordRDD
中的每个字符串创建一个新的RDD。以下是我的尝试:
1)失败了,因为Spark不支持嵌套RDD:
var newRDD = wordRDD.map( word => {
// execute myFunction()
(new MyClass(word)).myFunction()
})
2) 失败(可能是由于作用域问题导致的):
var newRDD = sc.parallelize(new Array[String](0))
val wordArray = wordRDD.collect
for (w <- wordArray){
newRDD = sc.union(newRDD,(new MyClass(w)).myFunction())
}
我的理想结果应该是:
// input RDD (wordRDD)
wordRDD: org.apache.spark.rdd.RDD[String] = ('apple','banana','orange'...)
// myFunction behavior
new MyClass('apple').myFunction(): RDD[String] = ('pple','aple'...'appl')
// after executing myFunction() on each word in wordRDD:
newRDD: RDD[String] = ('pple','aple',...,'anana','bnana','baana',...)
我在这里找到了一个相关的问题:Spark当合并大量RDD时会抛出堆栈溢出错误,但它没有解决我的问题。
wordRDD.map
中发生的所有事情都在集群上执行。因此,内部的collect
必须从正在运行的作业中触发一个新的Spark作业。我怀疑它不会分布式地运行。 - Till RohrmannmyFunction
,可以从一个字符串/单词创建一个RDD[String]
。 - Till RohrmannmyFunction
更改为返回不同的内容。不知道这个函数有多复杂,很难说其中的计算是否分布式的。如果收集数据集意味着所有之前完成的计算都不再分布式,那么就没有任何东西是分布式的了。 - Climbs_lika_SpyderRDD
中调用collect
基本上是不可能的。你认为这应该如何实现?collect
方法将触发作业执行并将结果作为Seq
返回到驱动节点。但flatMap
操作是作业的一部分,是分布式执行的。那么collect
方法应该如何执行呢?此外,生成新的RDDs
所必需的SparkContext
根本无法序列化。因此,无法将其与您的UDF一起发送。 - Till Rohrmann