计算Kmeans的成本

4

我正在使用这个模型,但它不是由我编写的。为了预测质心,我必须执行以下操作:

model = cPickle.load(open("/tmp/model_centroids_128d_pkl.lopq"))
codes = d.map(lambda x: (x[0], model.predict_coarse(x[1])))

调用 `d.first()' 将返回以下结果:

(u'3768915289',
 array([ -86.00641097, -100.41325623,   <128 coords in total>]))

并且codes.first()

(u'3768915289', (5657, 7810))

我该如何计算这个KMeans模型的computeCost()呢?


在阅读了train_model.py之后,我尝试了以下方法:

In [23]: from pyspark.mllib.clustering import KMeans, KMeansModel
In [24]: Cs = model.Cs     # centroids
In [25]: model = KMeansModel(Cs[0]) # I am very positive this line is good
In [26]: costs = d.map(lambda x: model.computeCost(x[1]))
In [27]: costs.first()

但是我得到了这个错误:
AttributeError: 'numpy.ndarray' object has no attribute 'map'

这意味着Spark会在底层尝试使用map()来处理x[1],因此它期望一个RDD!但是它的形式是什么?

我现在正在尝试使用:

d = d.map(lambda x: x[1])
d.first()
array([  7.17036494e+01,   1.07987890e+01, ...])
costs = model.computeCost(d)

而且我没有收到错误信息:

16/08/30 00:39:21 WARN TaskSetManager: Lost task 821.0 in stage 40.0 : java.lang.IllegalArgumentException: requirement failed
    at scala.Predef$.require(Predef.scala:221)
    at org.apache.spark.mllib.util.MLUtils$.fastSquaredDistance(MLUtils.scala:330)
    at org.apache.spark.mllib.clustering.KMeans$.fastSquaredDistance(KMeans.scala:595)
    at org.apache.spark.mllib.clustering.KMeans$$anonfun$findClosest$1.apply(KMeans.scala:569)
    at org.apache.spark.mllib.clustering.KMeans$$anonfun$findClosest$1.apply(KMeans.scala:563)
    at scala.collection.mutable.ArraySeq.foreach(ArraySeq.scala:73)
    at org.apache.spark.mllib.clustering.KMeans$.findClosest(KMeans.scala:563)
    at org.apache.spark.mllib.clustering.KMeans$.pointCost(KMeans.scala:586)
    at org.apache.spark.mllib.clustering.KMeansModel$$anonfun$computeCost$1.apply(KMeansModel.scala:88)
    at org.apache.spark.mllib.clustering.KMeansModel$$anonfun$computeCost$1.apply(KMeansModel.scala:88)
    at scala.collection.Iterator$$anon$11.next(Iterator.scala:328)
    at scala.collection.Iterator$class.foreach(Iterator.scala:727)
    at scala.collection.AbstractIterator.foreach(Iterator.scala:1157)
    at scala.collection.TraversableOnce$class.foldLeft(TraversableOnce.scala:144)
    at scala.collection.AbstractIterator.foldLeft(Iterator.scala:1157)
    at scala.collection.TraversableOnce$class.fold(TraversableOnce.scala:199)
    at scala.collection.AbstractIterator.fold(Iterator.scala:1157)
    at org.apache.spark.rdd.RDD$$anonfun$fold$1$$anonfun$19.apply(RDD.scala:1086)
    at org.apache.spark.rdd.RDD$$anonfun$fold$1$$anonfun$19.apply(RDD.scala:1086)
    at org.apache.spark.SparkContext$$anonfun$36.apply(SparkContext.scala:1951)
    at org.apache.spark.SparkContext$$anonfun$36.apply(SparkContext.scala:1951)
    at org.apache.spark.scheduler.ResultTask.runTask(ResultTask.scala:66)
    at org.apache.spark.scheduler.Task.run(Task.scala:89)
    at org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:227)
    at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1142)
    at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:617)
    at java.lang.Thread.run(Thread.java:745)
---------------------------------------------------------------------------
Py4JJavaError                             Traceback (most recent call last)
<ipython-input-44-6223595c8b5f> in <module>()
----> 1 costs = model.computeCost(d)

/home/gs/spark/current/python/pyspark/mllib/clustering.py in computeCost(self, rdd)
    140         """
    141         cost = callMLlibFunc("computeCostKmeansModel", rdd.map(_convert_to_vector),
--> 142                              [_convert_to_vector(c) for c in self.centers])
    143         return cost
    144 

/home/gs/spark/current/python/pyspark/mllib/common.py in callMLlibFunc(name, *args)
    128     sc = SparkContext.getOrCreate()
    129     api = getattr(sc._jvm.PythonMLLibAPI(), name)
--> 130     return callJavaFunc(sc, api, *args)
    131 
    132 

/home/gs/spark/current/python/pyspark/mllib/common.py in callJavaFunc(sc, func, *args)
    121     """ Call Java Function """
    122     args = [_py2java(sc, a) for a in args]
--> 123     return _java2py(sc, func(*args))
    124 
    125 

/home/gs/spark/current/python/lib/py4j-0.9-src.zip/py4j/java_gateway.py in __call__(self, *args)
    811         answer = self.gateway_client.send_command(command)
    812         return_value = get_return_value(
--> 813             answer, self.gateway_client, self.target_id, self.name)
    814 
    815         for temp_arg in temp_args:

/home/gs/spark/current/python/pyspark/sql/utils.py in deco(*a, **kw)
     43     def deco(*a, **kw):
     44         try:
---> 45             return f(*a, **kw)
     46         except py4j.protocol.Py4JJavaError as e:
     47             s = e.java_exception.toString()

/home/gs/spark/current/python/lib/py4j-0.9-src.zip/py4j/protocol.py in get_return_value(answer, gateway_client, target_id, name)
    306                 raise Py4JJavaError(
    307                     "An error occurred while calling {0}{1}{2}.\n".
--> 308                     format(target_id, ".", name), value)
    309             else:
    310                 raise Py4JError(

Py4JJavaError: An error occurred while calling o25177.computeCostKmeansModel.
: org.apache.spark.SparkException: Job aborted due to stage failure: Task 821 in stage 40.0 failed 4 times, most recent failure: Lost task 821.3 in stage 40.0: java.lang.IllegalArgumentException: requirement failed
    at scala.Predef$.require(Predef.scala:221)
    at org.apache.spark.mllib.util.MLUtils$.fastSquaredDistance(MLUtils.scala:330)
    at org.apache.spark.mllib.clustering.KMeans$.fastSquaredDistance(KMeans.scala:595)
    at org.apache.spark.mllib.clustering.KMeans$$anonfun$findClosest$1.apply(KMeans.scala:569)
    at org.apache.spark.mllib.clustering.KMeans$$anonfun$findClosest$1.apply(KMeans.scala:563)
    at scala.collection.mutable.ArraySeq.foreach(ArraySeq.scala:73)
    at org.apache.spark.mllib.clustering.KMeans$.findClosest(KMeans.scala:563)
    at org.apache.spark.mllib.clustering.KMeans$.pointCost(KMeans.scala:586)
    at org.apache.spark.mllib.clustering.KMeansModel$$anonfun$computeCost$1.apply(KMeansModel.scala:88)
    at org.apache.spark.mllib.clustering.KMeansModel$$anonfun$computeCost$1.apply(KMeansModel.scala:88)
    at scala.collection.Iterator$$anon$11.next(Iterator.scala:328)
    at scala.collection.Iterator$class.foreach(Iterator.scala:727)
    at scala.collection.AbstractIterator.foreach(Iterator.scala:1157)
    at scala.collection.TraversableOnce$class.foldLeft(TraversableOnce.scala:144)
    at scala.collection.AbstractIterator.foldLeft(Iterator.scala:1157)
    at scala.collection.TraversableOnce$class.fold(TraversableOnce.scala:199)
    at scala.collection.AbstractIterator.fold(Iterator.scala:1157)
    at org.apache.spark.rdd.RDD$$anonfun$fold$1$$anonfun$19.apply(RDD.scala:1086)
    at org.apache.spark.rdd.RDD$$anonfun$fold$1$$anonfun$19.apply(RDD.scala:1086)
    at org.apache.spark.SparkContext$$anonfun$36.apply(SparkContext.scala:1951)
    at org.apache.spark.SparkContext$$anonfun$36.apply(SparkContext.scala:1951)
    at org.apache.spark.scheduler.ResultTask.runTask(ResultTask.scala:66)
    at org.apache.spark.scheduler.Task.run(Task.scala:89)
    at org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:227)
    at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1142)
    at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:617)
    at java.lang.Thread.run(Thread.java:745)

编辑:

split_vecs = d.map(lambda x: np.split(x[1], 2))

似乎是一个不错的步骤,因为质心具有64维。

model.computeCost((d.map(lambda x: x[1])).first())

出现如下错误:AttributeError: 'numpy.ndarray' object has no attribute 'map'


你能预测任何一个样本的簇吗?就像 model.predict(d.map(lambda x: x[1]).first()) 那样。 - Alberto Bonsanto
@AlbertoBonsanto,你的意思是我在第一次尝试中使用 d 吗?还有哪个 model?因为在我的问题中,我只加载了一次,但在编辑时重新分配了它...我尝试了几种方法,但都不起作用...让我知道你的意思,我会更新我的问题。 - gsamaras
我尝试使用少于15个元素的RDD(在读取文件时使用了takeSample())@AlbertoBonsanto。 - gsamaras
@AlbertoBonsanto,如果我做了这样的事情:split_vecs = d.map(lambda x: np.split(x[1], 2)),然后 model.predict(split_vecs.map(lambda x: x[1]).first()),我会得到一个数字作为输出,但是我不确定它是否正确。 - gsamaras
1个回答

4
据我所读的文档,你需要:
  1. 创建一个模型,可以通过读取之前保存的模型或拟合一个新模型。

  2. 在获取到该模型后,您可以使用其方法computeCost,该方法需要格式良好的RDD以输出有用的信息。

因此,如果假设您的变量modelKMeansModel,并且存储在变量d中的数据具有所期望的表示形式,则应能运行以下代码:
model.computeCost(d)

编辑:

您应创建一个RDD,其中包含与质心相同维度的向量,并将其作为输入参数提供给computeCost(),例如:

split_vecs = d.map(lambda x: (x[0], np.split(x[1], 2)))
costs_per_split = [KMeansModel(model.Cs[i]).computeCost(split_vecs.map(lambda x: x[1][i])) for i in range(2)]

网页内容由stack overflow 提供, 点击上面的
可以查看英文原文,
原文链接