使用字节数组作为键的ReduceByKey

3

我希望能够使用Tuple2<byte[], obj>的RDD对进行工作,但是因为它们的引用值不同,拥有相同内容的byte[]被视为不同的值。

我没有看到任何传递自定义比较器的方法。我可以将byte[]转换为一个显式字符集的String,但我想知道是否有更高效的方法。

2个回答

5

自定义比较器是不够的,因为Spark使用对象的hashCode来组织分区中的键。(至少HashPartitioner会这样做,你可以提供一个自定义分区器来处理数组)

包装数组以提供适当的equalshashCode应该解决问题。一个轻量级的包装器就可以搞定:

class SerByteArr(val bytes: Array[Byte]) extends Serializable {
    override val hashCode = bytes.deep.hashCode
    override def equals(obj:Any) = obj.isInstanceOf[SerByteArr] && obj.asInstanceOf[SerByteArr].bytes.deep == this.bytes.deep
}

一个快速测试:

import scala.util.Random
val data = (1 to 100000).map(_ => Random.nextInt(100).toString.getBytes("UTF-8"))
val rdd = sparkContext.parallelize(data)
val byKey = rdd.keyBy(identity)
// this won't work b/c the partitioner does not support arrays as keys
val grouped = byKey.groupByKey
// org.apache.spark.SparkException: Default partitioner cannot partition array keys.

// let's use the wrapper instead   

val keyable = rdd.map(elem =>  new SerByteArr(elem))
val bySerKey = keyable.keyBy(identity)
val grouped = bySerKey.groupByKey
grouped.count
// res14: Long = 100

好的,我用的是Java,不是Scala。我会尝试使用Java的等效方法。 - neverendingqs
我利用了这里的代码片段:https://dev59.com/M3VD5IYBdhLWcg3wTJrF#27609。`Arrays.equals()`和`Arrays.hashCode()`使它变得更容易了。 - neverendingqs

1
你可以创建一个包装类并定义自己的相等/比较函数。这可能会更快一些,因为您不必复制数组(尽管仍然有对象分配)。

网页内容由stack overflow 提供, 点击上面的
可以查看英文原文,
原文链接