我正在尝试学习如何更多地使用DataFrames和DataSets,除了RDDs。 对于一个RDD,我知道可以使用someRDD.reduceByKey((x,y) => x + y)
,但是我在Dataset中没有看到那个函数。 因此,我决定自己写一个。
someRdd.map(x => ((x.fromId,x.toId),1)).map(x => collection.mutable.Map(x)).reduce((x,y) => {
val result = mutable.HashMap.empty[(Long,Long),Int]
val keys = mutable.HashSet.empty[(Long,Long)]
y.keys.foreach(z => keys += z)
x.keys.foreach(z => keys += z)
for (elem <- keys) {
val s1 = if(x.contains(elem)) x(elem) else 0
val s2 = if(y.contains(elem)) y(elem) else 0
result(elem) = s1 + s2
}
result
})
然而,这将一切都返回给驱动程序。你如何编写代码来返回一个Dataset
? 也许可以使用mapPartition
在那里完成?
请注意,这段代码可以编译,但不能运行,因为它还没有针对Map
的编码器。