Spark:value reduceByKey不是成员

3

在对一些稀疏向量进行聚类后,我需要找到每个聚类中的交集向量。为了实现这一点,我尝试按照以下示例来缩小MLlib向量:

import org.apache.spark.SparkConf
import org.apache.spark.SparkContext
import org.apache.spark.mllib.clustering.KMeans
import org.apache.spark.mllib.linalg.Vectors

//For Sparse Vector
import org.apache.spark.mllib.regression.LabeledPoint
import org.apache.spark.mllib.util.MLUtils
import org.apache.spark.rdd.RDD
import org.apache.spark.mllib.linalg.{Vector, Vectors}

object Recommend {

  def main(args: Array[String]) {
    // set up environment
    val conf = new SparkConf()
      .setAppName("Test")
      .set("spark.executor.memory", "2g")
    val sc = new SparkContext(conf)

    // Some vectors
    val vLen = 1800
    val sv11: Vector = Vectors.sparse(vLen,Seq( (100,1.0), (110,1.0), (120,1.0), (130, 1.0) ))
    val sv12: Vector = Vectors.sparse(vLen,Seq( (100,1.0), (110,1.0), (120,1.0), (130, 1.0), (140, 1.0)  ))
    val sv13: Vector = Vectors.sparse(vLen,Seq( (100,1.0), (120,1.0), (130,1.0) ))
    val sv14: Vector = Vectors.sparse(vLen,Seq( (110,1.0), (130, 1.0) ))
    val sv15: Vector = Vectors.sparse(vLen,Seq( (140, 1.0) ))

    val sv21: Vector = Vectors.sparse(vLen,Seq( (200,1.0), (210,1.0), (220,1.0), (230, 1.0) ))
    val sv22: Vector = Vectors.sparse(vLen,Seq( (200,1.0), (210,1.0), (220,1.0), (230, 1.0), (240, 1.0)  ))
    val sv23: Vector = Vectors.sparse(vLen,Seq( (200,1.0), (220,1.0), (230,1.0) ))
    val sv24: Vector = Vectors.sparse(vLen,Seq( (210,1.0), (230, 1.0) ))
    val sv25: Vector = Vectors.sparse(vLen,Seq( (240, 1.0) ))

    val sv31: Vector = Vectors.sparse(vLen,Seq( (300,1.0), (310,1.0), (320,1.0), (330, 1.0) ))
    val sv32: Vector = Vectors.sparse(vLen,Seq( (300,1.0), (310,1.0), (320,1.0), (330, 1.0), (340, 1.0)  ))
    val sv33: Vector = Vectors.sparse(vLen,Seq( (300,1.0), (320,1.0), (330,1.0) ))
    val sv34: Vector = Vectors.sparse(vLen,Seq( (310,1.0), (330, 1.0) ))
    val sv35: Vector = Vectors.sparse(vLen,Seq( (340, 1.0) ))

    val sparseData = sc.parallelize(Seq(
        sv11, sv12, sv13, sv14, sv15,
        sv21, sv22, sv23, sv24, sv25,
        sv31, sv32, sv33, sv34, sv35
        ))

    // Cluster the data into two classes using KMeans
    val numClusters = 3
    val numIterations = 20

    test(numClusters, numIterations, sparseData)
  }

  def test(numClusters:Int, numIterations:Int,
      data: org.apache.spark.rdd.RDD[org.apache.spark.mllib.linalg.Vector]) = {

    val clusters = KMeans.train(data, numClusters, numIterations)

    val predictions = data.map(v => (clusters.predict(v), v) )

    predictions.reduceByKey((v1, v2) => v1)

  }
}

predictions.reduceByKey((v1, v2) => v1)这一行代码导致错误:

value reduceByKey is not a member of org.apache.spark.rdd.RDD[(Int, org.apache.spark.mllib.linalg.Vector)]

什么原因导致了这个问题?

1
可能是Scala Spark中找不到reduceByKey方法的重复问题 - Aleksandar Stojadinovic
谢谢您的解决方法)) - zork
1个回答

1
你的代码应该像你已经猜到的那样,添加这个导入:
import org.apache.spark.SparkContext._

因为它带来了几个隐式的转换,其中最重要的(对于您的情况)是 PairRDD 隐式转换。Spark 会猜测当您有一个 TupleRDD 时,左侧可以被视为键,因此将给您访问一些方便的转换或操作,例如 reduceByKey
敬礼,

网页内容由stack overflow 提供, 点击上面的
可以查看英文原文,
原文链接