使用Scala对象作为键的reduceByKey

6

我正在使用Scala的Spark,并且我有一个RDD,其中包含一个复杂对象作为键和一个double值的tuple2。目标是如果对象相同,则将double(频率)相加。

为此,我已经定义了以下对象:

    case class SimpleCoocurrence(word:String, word_pos:String, cooc:String, cooc_pos:String, distance:Double) extends Ordered[SimpleCoocurrence]{
      def compare(that: SimpleCoocurrence) = {
        if(this.word.equals(that.word)&&this.word_pos.equals(that.word_pos)
           &&this.cooc.equals(that.cooc)&&this.cooc_pos.equals(that.cooc_pos))
          0
        else
          this.toString.compareTo(that.toString)
      }
    }

现在我正在尝试使用reduceByKey,代码如下:
val coocRDD = sc.parallelize(coocList)
println(coocRDD.count)
coocRDD.map(tup=>tup).reduceByKey(_+_)
println(coocRDD.count)

但是结果表明,在reduceByKey处理前后,RDD包含的元素数量完全相同。

如何使用tuple2 [SimpleCoocurrence,Double]执行reduceByKey? 实现有序特征(Ordered trait)是告诉Spark如何比较对象的好方法吗? 我应该仅使用tuple2 [String, Double]吗?

谢谢!

3个回答

6

reduceByKey并不使用Ordering,而是使用hashCodeequals来确定哪些键是相同的。特别地,hashPartitioner将按散列分组键,以便具有相同hashCode的键落在同一分区,这样进一步的缩减可以在每个分区上发生。

案例类有一个equalshashCode的默认实现。可能使用的测试数据具有不同的distance:Double字段值,使得每个实例都是唯一的对象。将其用作键将导致只有相同的对象被归约为一个。

解决此问题的一种方法是为您的case class定义一个键,以及该对象的附加方法,例如:

case class SimpleCoocurrence(word:String, word_pos:String, cooc:String, cooc_pos:String, distance:Double) extends Serializable {
   val key = word + word_pos + cooc + cooc_pos
}
object SimpleCoocurrence {
   val add: (SimpleCoocurrence, SimpleCoocurrence) => SimpleCoocurrence = ???
}

val coocList:List[SimpleCoocurrence] = ???
val coocRDD = sc.parallelize(coocList)
val coocByKey = coocRDD.keyBy(_.key)
val addedCooc = coocByKey.reduceByKey(SimpleCoocurrence.add)

(*) 代码提供作为指导示例 - 未编译或测试。


0

首先,我很蠢...

接下来,如果有人遇到同样的问题并想在Spark上使用复杂的scala对象作为reduceByKey的键:

Spark知道如何比较两个对象,即使它们没有实现Ordered。所以上面的代码实际上是可行的。

唯一的问题是...我在打印相同的RDD之前和之后。当我写这个时,它实际上运行良好。

val coocRDD = sc.parallelize(coocList)
println(coocRDD.count)
val newRDD = coocRDD.map(tup=>tup).reduceByKey(_+_)
println(newRDD.count)

0

你没有存储 reduceByKey 的结果。请尝试这样做:

val coocRDD = sc.parallelize(coocList)
println(coocRDD.count)
val result = coocRDD.map(tup=>tup).reduceByKey(_+_)
println(result.count)

网页内容由stack overflow 提供, 点击上面的
可以查看英文原文,
原文链接