在Python中从Spark DataFrame创建带标签的点(labeledPoints)

14

我应该使用Python中的哪个.map()函数来从Spark数据框创建一组labeledPoints?如果标签/结果不是第一列,但我可以引用它的列名'status',该怎么写?

我使用以下代码创建Python数据框:

def parsePoint(line):
    listmp = list(line.split('\t'))
    dataframe = pd.DataFrame(pd.get_dummies(listmp[1:]).sum()).transpose()
    dataframe.insert(0, 'status', dataframe['accepted'])
    if 'NULL' in dataframe.columns:
        dataframe = dataframe.drop('NULL', axis=1)  
    if '' in dataframe.columns:
        dataframe = dataframe.drop('', axis=1)  
    if 'rejected' in dataframe.columns:
        dataframe = dataframe.drop('rejected', axis=1)  
    if 'accepted' in dataframe.columns:
        dataframe = dataframe.drop('accepted', axis=1)  
    return dataframe 

在 reduce 函数重新组合所有 Pandas 数据帧之后,我将其转换为 Spark 数据帧。

parsedData=sqlContext.createDataFrame(parsedData)

但现在我该如何在Python中从这个数据集创建labeledPoints?我假设可能需要使用另一个.map()函数吗?
1个回答

17
如果您已经具有数值特征,而且不需要其他转换,您可以使用VectorAssembler来合并包含自变量的列:
from pyspark.ml.feature import VectorAssembler

assembler = VectorAssembler(
    inputCols=["your", "independent", "variables"],
    outputCol="features")

transformed = assembler.transform(parsedData)

接下来,您只需简单地映射:

from pyspark.mllib.regression import LabeledPoint
from pyspark.sql.functions import col

(transformed.select(col("outcome_column").alias("label"), col("features"))
  .rdd
  .map(lambda row: LabeledPoint(row.label, row.features)))

从Spark 2.0版本开始,mlmllib API不再兼容,后者正朝着废弃和删除的方向发展。如果您仍需要使用它,您必须将ml.Vectors转换为mllib.Vectors

from pyspark.mllib import linalg as mllib_linalg
from pyspark.ml import linalg as ml_linalg

def as_old(v):
    if isinstance(v, ml_linalg.SparseVector):
        return mllib_linalg.SparseVector(v.size, v.indices, v.values)
    if isinstance(v, ml_linalg.DenseVector):
        return mllib_linalg.DenseVector(v.values)
    raise ValueError("Unsupported type {0}".format(type(v)))

和地图:

lambda row: LabeledPoint(row.label, as_old(row.features)))

网页内容由stack overflow 提供, 点击上面的
可以查看英文原文,
原文链接