在Spark数据集中实现自己的reduceByKey

28

我正在尝试学习如何更多地使用DataFrames和DataSets,除了RDDs。 对于一个RDD,我知道可以使用someRDD.reduceByKey((x,y) => x + y),但是我在Dataset中没有看到那个函数。 因此,我决定自己写一个。

someRdd.map(x => ((x.fromId,x.toId),1)).map(x => collection.mutable.Map(x)).reduce((x,y) => {
  val result = mutable.HashMap.empty[(Long,Long),Int]
  val keys = mutable.HashSet.empty[(Long,Long)]
  y.keys.foreach(z => keys += z)
  x.keys.foreach(z => keys += z)
  for (elem <- keys) {
    val s1 = if(x.contains(elem)) x(elem) else 0
    val s2 = if(y.contains(elem)) y(elem) else 0
    result(elem) = s1 + s2
  }
  result
})

然而,这将一切都返回给驱动程序。你如何编写代码来返回一个Dataset? 也许可以使用mapPartition在那里完成?

请注意,这段代码可以编译,但不能运行,因为它还没有针对Map的编码器。


1
使用Spark 2.0.0,尝试这个:yourDataset.groupByKey(...).reduceGroups(...) - FelixHo
9
催化剂优化器是否会注意到您正在执行一组跟随一个规约,并使其更加高效? "高效" 是指在 RDD 上执行按键减少比先进行分组再进行减少更好的方式。 - Carlos Bribiescas
2个回答

40

我假设你的目标是将这个成语翻译成数据集:

rdd.map(x => (x.someKey, x.someField))
   .reduceByKey(_ + _)

// => returning an RDD of (KeyType, FieldType)

目前,我在使用Dataset API时找到的最接近的解决方案如下:

ds.map(x => (x.someKey, x.someField))          // [1]
  .groupByKey(_._1)                            
  .reduceGroups((a, b) => (a._1, a._2 + b._2))
  .map(_._2)                                   // [2]

// => returning a Dataset of (KeyType, FieldType)

// Comments:
// [1] As far as I can see, having a map before groupByKey is required
//     to end up with the proper type in reduceGroups. After all, we do
//     not want to reduce over the original type, but the FieldType.
// [2] required since reduceGroups converts back to Dataset[(K, V)]
//     not knowing that our V's are already key-value pairs.

看起来并不太优雅,而且根据快速基准测试,性能也要差得多,所以也许我们在这里遗漏了什么...

注意:另一种选择可能是将groupByKey(_.someKey)用作第一步。问题在于使用groupByKey会将类型从常规Dataset更改为KeyValueGroupedDataset。后者没有常规的map函数。它提供了一个mapGroups,但似乎不太方便,因为它将值包装成一个Iterator并根据docstring执行洗牌。


9
这样就可以了。只是需要注意的是,reduceByKey更加高效,因为它在分组前在每个节点上进行了缩减操作。而groupByKey则先对所有元素进行分组,然后再进行缩减。这就是为什么它的性能要低得多。有趣的是,在我还不知道reduceByKey之前,我就是这样做的,但是我已经忘记了 :-) - Carlos Bribiescas
1
@CarlosBribiescas 我在互联网上读到,数据集利用Spark的Catalyst优化器,并且应该能够在洗牌之前推送reduce函数。这可能解释了为什么Dataset API中没有reduceByKey方法。然而,根据我的经验,情况并非如此,groupByKey.reduceGroups洗牌的数据量更大,速度也更慢,相比之下reduceByKey更快。 - Justin Raymond
11
似乎从Spark 2.0.1和2.1.0开始,reduceGroups的性能问题已得到解决[Spark-16391](https://issues.apache.org/jira/browse/SPARK-16391)。 - Franzi
啊,是的。听起来它看起来就像reduceByKey一样工作。你知道是否有计划实现reduceByKey吗?从技术上讲,这个方法可以工作,但更冗长。 - Carlos Bribiescas
这个解决方案对我很有效,谢谢!但我有一个问题:你知道为什么reduceByKey不支持模式匹配吗?为了清晰起见,我想能够编写reduceByKey { case ((k1, v1), (k2, v2)) => (k1, v1 + v2) },但即使我在左侧添加类型注释,编译器也不喜欢它。 - Paul Siegel

9

一种更加高效的解决方案是在groupByKey之前使用mapPartitions来减少洗牌的次数(请注意,这个函数签名与reduceByKey不完全相同,但我认为通过传递一个函数要比要求数据集由元组组成更加灵活)。

def reduceByKey[V: ClassTag, K](ds: Dataset[V], f: V => K, g: (V, V) => V)
  (implicit encK: Encoder[K], encV: Encoder[V]): Dataset[(K, V)] = {
  def h[V: ClassTag, K](f: V => K, g: (V, V) => V, iter: Iterator[V]): Iterator[V] = {
    iter.toArray.groupBy(f).mapValues(_.reduce(g)).map(_._2).toIterator
  }
  ds.mapPartitions(h(f, g, _))
    .groupByKey(f)(encK)
    .reduceGroups(g)
}

根据数据的形状和大小,这与reduceByKey的性能相差不到1秒,并且比groupByKey(_._1).reduceGroups快大约2倍。仍有改进空间,欢迎提出建议。


网页内容由stack overflow 提供, 点击上面的
可以查看英文原文,
原文链接