如何使用Pyspark Mllib RegressionMetrics和真实预测结果

4

我正在尝试使用pyspark 1.4中的RegressionMetrics()来处理由LinearRegressionWithSGD生成的预测结果。

所有RegressionMetrics()的示例都是针对“人工”预测和观测数据,例如pyspark mllib文档中所给出的示例。

predictionAndObservations = sc.parallelize([ (2.5, 3.0), (0.0, -0.5), (2.0, 2.0), (8.0, 7.0)])

对于这种使用sc.parallelize生成的“人工”RDD,一切都正常。然而,当我用另一种生成方式生成的另一个RDD进行相同操作时,出现了问题。

TypeError: DoubleType can not accept object in type <type 'numpy.float64'>

下面是一个简短的可重现示例。
问题可能是什么?
from pyspark.mllib.regression import LabeledPoint
from pyspark.mllib.regression import LinearRegressionWithSGD, LinearRegressionModel
from pyspark.mllib.evaluation import RegressionMetrics

dataRDD = sc.parallelize([LabeledPoint(1, [1,1]), LabeledPoint(2, [2,2]), LabeledPoint(3, [3,3])])
lrModel = LinearRegressionWithSGD.train(dataRDD)
prediObserRDD = dataRDD.map(lambda p: (lrModel.predict(p.features), p.label)).cache()

让我们检查一下RDD是否确实是(预测,观察)对。

prediObserRDD.take(4) # looks OK

现在尝试计算指标。
metrics = RegressionMetrics(prediObserRDD)

它给出了以下错误。
TypeError                                 Traceback (most recent call last)
<ipython-input-1-ca9ad8e9faf1> in <module>()
      7 prediObserRDD = dataRDD.map(lambda p: (lrModel.predict(p.features), p.label)).cache()
      8 prediObserRDD.take(4)
----> 9 metrics = RegressionMetrics(prediObserRDD)
     10 #metrics.explainedVariance
     11 #metrics.meanAbsoluteError

/usr/local/spark-1.4.0-bin-hadoop2.6/python/pyspark/mllib/evaluation.py in __init__(self, predictionAndObservations)
     99         df = sql_ctx.createDataFrame(predictionAndObservations, schema=StructType([
    100             StructField("prediction", DoubleType(), nullable=False),
--> 101             StructField("observation", DoubleType(), nullable=False)]))
    102         java_class = sc._jvm.org.apache.spark.mllib.evaluation.RegressionMetrics
    103         java_model = java_class(df._jdf)

/usr/local/spark-1.4.0-bin-hadoop2.6/python/pyspark/sql/context.py in createDataFrame(self, data, schema, samplingRatio)
    337 
    338         for row in rows:
--> 339             _verify_type(row, schema)
    340 
    341         # convert python objects to sql data

/usr/local/spark-1.4.0-bin-hadoop2.6/python/pyspark/sql/types.py in _verify_type(obj, dataType)
   1027                              "length of fields (%d)" % (len(obj), len(dataType.fields)))
   1028         for v, f in zip(obj, dataType.fields):
-> 1029             _verify_type(v, f.dataType)
   1030 
   1031 _cached_cls = weakref.WeakValueDictionary()

/usr/local/spark-1.4.0-bin-hadoop2.6/python/pyspark/sql/types.py in _verify_type(obj, dataType)
   1011     if type(obj) not in _acceptable_types[_type]:
   1012         raise TypeError("%s can not accept object in type %s"
-> 1013                         % (dataType, type(obj)))
   1014 
   1015     if isinstance(dataType, ArrayType):

TypeError: DoubleType can not accept object in type <type 'numpy.float64'>

同样的问题也出现在BinaryClassificationMetrics中(针对另一个数据集和分类任务)。

@eliasah 感谢您的评论,我在Spark编程指南中发现DoubleWritable对应于Python类型“float”。因此,我将值转换为浮点数,现在一切都正常了。请将您的评论发布为答案,以便我可以接受它。 - lanenok
顺便说一下,这是Spark的意外行为。例如,DenseVector就是numpy数组。自动将所有numpy.float类型转换为DoubleType是合理的。 - lanenok
1个回答

7

就像错误提示所说:TypeError: DoubleType无法接受类型为<type 'numpy.float64'>的对象

您正在尝试将numpy.float64转换为无法执行的Double。

要解决这个TypeError,您需要将值转换为可接受的类型。

例如:

from pyspark.mllib.regression import LabeledPoint
from pyspark.mllib.regression import LinearRegressionWithSGD, LinearRegressionModel
from pyspark.mllib.evaluation import RegressionMetrics

dataRDD = sc.parallelize([LabeledPoint(1, [1,1]), LabeledPoint(2, [2,2]), LabeledPoint(3, [3,3])])
lrModel = LinearRegressionWithSGD.train(dataRDD)
prediObserRDD = dataRDD.map(lambda p: (float(lrModel.predict(p.features)), p.label)).cache()

如果您注意到了,我使用Python内置的 float 函数将预测标签转换为双精度数字。
现在您可以计算指标了:
>>> metrics = RegressionMetrics(prediObserRDD)
>>> metrics.explainedVariance
1.0

网页内容由stack overflow 提供, 点击上面的
可以查看英文原文,
原文链接