Pyspark和PCA:我如何提取此PCA的特征向量?我如何计算它们解释了多少方差?

26

我正在使用 pyspark(使用 spark ml 库)和 PCA 模型来降低 Spark DataFrame 的维度,代码如下:

pca = PCA(k=3, inputCol="features", outputCol="pca_features")
model = pca.fit(data)

其中data是一个带有一个名为features的列的Spark DataFrame,该列是一个3维的DenseVector

data.take(1)
Row(features=DenseVector([0.4536,-0.43218, 0.9876]), label=u'class1')

拟合后,我对数据进行转换:

transformed = model.transform(data)
transformed.first()
Row(features=DenseVector([0.4536,-0.43218, 0.9876]), label=u'class1', pca_features=DenseVector([-0.33256, 0.8668, 0.625]))

我该如何提取这个PCA的特征向量?我该如何计算它们解释了多少方差?

4个回答

31
[更新: 从Spark 2.2开始,PySpark中已经同时提供了PCA和SVD - 请参见JIRA票SPARK-6227以及PCA & PCAModel,适用于Spark ML 2.2;原始答案仍适用于旧版本的Spark。]
嗯,这似乎令人难以置信,但事实上目前(至少在Spark 1.5中)没有一种方法可以从PCA分解中提取这样的信息。但是,同样有许多类似的“投诉” - 例如,无法从CrossValidatorModel中提取最佳参数,请参见here

幸运的是,几个月前我参加了AMPlab(伯克利)和Databricks创建的'可扩展机器学习' MOOC课程,也就是Spark的创建者。在作业中,我们“手动”实现了完整的PCA流水线。我已经修改了当时的函数(请放心,我得到了满分:-),以便使用与您相同格式的数据框作为输入(而不是RDD),即包含数值特征的DenseVectors行。

首先,我们需要定义一个中间函数estimatedCovariance,如下所示:

import numpy as np

def estimateCovariance(df):
    """Compute the covariance matrix for a given dataframe.

    Note:
        The multi-dimensional covariance array should be calculated using outer products.  Don't
        forget to normalize the data by first subtracting the mean.

    Args:
        df:  A Spark dataframe with a column named 'features', which (column) consists of DenseVectors.

    Returns:
        np.ndarray: A multi-dimensional array where the number of rows and columns both equal the
            length of the arrays in the input dataframe.
    """
    m = df.select(df['features']).map(lambda x: x[0]).mean()
    dfZeroMean = df.select(df['features']).map(lambda x:   x[0]).map(lambda x: x-m)  # subtract the mean

    return dfZeroMean.map(lambda x: np.outer(x,x)).sum()/df.count()

然后,我们可以编写一个主要的 pca 函数,如下所示:
from numpy.linalg import eigh

def pca(df, k=2):
    """Computes the top `k` principal components, corresponding scores, and all eigenvalues.

    Note:
        All eigenvalues should be returned in sorted order (largest to smallest). `eigh` returns
        each eigenvectors as a column.  This function should also return eigenvectors as columns.

    Args:
        df: A Spark dataframe with a 'features' column, which (column) consists of DenseVectors.
        k (int): The number of principal components to return.

    Returns:
        tuple of (np.ndarray, RDD of np.ndarray, np.ndarray): A tuple of (eigenvectors, `RDD` of
        scores, eigenvalues).  Eigenvectors is a multi-dimensional array where the number of
        rows equals the length of the arrays in the input `RDD` and the number of columns equals
        `k`.  The `RDD` of scores has the same number of rows as `data` and consists of arrays
        of length `k`.  Eigenvalues is an array of length d (the number of features).
     """
    cov = estimateCovariance(df)
    col = cov.shape[1]
    eigVals, eigVecs = eigh(cov)
    inds = np.argsort(eigVals)
    eigVecs = eigVecs.T[inds[-1:-(col+1):-1]]  
    components = eigVecs[0:k]
    eigVals = eigVals[inds[-1:-(col+1):-1]]  # sort eigenvals
    score = df.select(df['features']).map(lambda x: x[0]).map(lambda x: np.dot(x, components.T) )
    # Return the `k` principal components, `k` scores, and all eigenvalues

    return components.T, score, eigVals

测试

首先,让我们看看使用现有方法的结果,使用Spark ML PCA 文档中的示例数据(将它们修改为所有DenseVectors):

 from pyspark.ml.feature import *
 from pyspark.mllib.linalg import Vectors
 data = [(Vectors.dense([0.0, 1.0, 0.0, 7.0, 0.0]),),
         (Vectors.dense([2.0, 0.0, 3.0, 4.0, 5.0]),),
         (Vectors.dense([4.0, 0.0, 0.0, 6.0, 7.0]),)]
 df = sqlContext.createDataFrame(data,["features"])
 pca_extracted = PCA(k=2, inputCol="features", outputCol="pca_features")
 model = pca_extracted.fit(df)
 model.transform(df).collect()

 [Row(features=DenseVector([0.0, 1.0, 0.0, 7.0, 0.0]), pca_features=DenseVector([1.6486, -4.0133])),
  Row(features=DenseVector([2.0, 0.0, 3.0, 4.0, 5.0]), pca_features=DenseVector([-4.6451, -1.1168])),
  Row(features=DenseVector([4.0, 0.0, 0.0, 6.0, 7.0]), pca_features=DenseVector([-6.4289, -5.338]))]

然后,使用我们的方法:
 comp, score, eigVals = pca(df)
 score.collect()

 [array([ 1.64857282,  4.0132827 ]),
  array([-4.64510433,  1.11679727]),
  array([-6.42888054,  5.33795143])]

让我强调一下,我们在定义的函数中使用任何collect()方法 - score是一个RDD,正如它应该的那样。
请注意,我们第二列的符号都与现有方法派生的符号相反;但这不是问题:根据Hastie和Tibshirani合著的(可免费下载)统计学习导论,第382页:

每个主成分载荷向量都是唯一的,仅存在一个符号翻转。这意味着两个不同的软件包将产生相同的主成分载荷向量,尽管这些载荷向量的符号可能不同。符号可能不同,因为每个主成分载荷向量都指定了p维空间中的一个方向:翻转符号对方向没有影响,因为方向不会改变。[...] 同样,得分向量也是唯一的,仅存在一个符号翻转,因为Z的方差与−Z的方差相同。

最后,现在我们有了特征值,编写百分比解释方差的函数非常简单:
 def varianceExplained(df, k=1):
     """Calculate the fraction of variance explained by the top `k` eigenvectors.

     Args:
         df: A Spark dataframe with a 'features' column, which (column) consists of DenseVectors.
         k: The number of principal components to consider.

     Returns:
         float: A number between 0 and 1 representing the percentage of variance explained
             by the top `k` eigenvectors.
     """
     components, scores, eigenvalues = pca(df, k)  
     return sum(eigenvalues[0:k])/sum(eigenvalues)

 
 varianceExplained(df,1)
 # 0.79439325322305299

作为测试,我们还检查了在k=5时(因为原始数据是5维的),我们示例数据中解释的方差是否为1.0:
 varianceExplained(df,5)
 # 1.0

[使用Spark 1.5.0和1.5.1进行开发和测试]


15

编辑:

根据已解决的JIRA问题单SPARK-6227pysparkspark 2.2.0开始终于都提供了PCASVD

原始答案:

@desertnaut给出的答案从理论角度来看是非常优秀的,但我想提供另一种方法来计算SVD并提取特征向量。

from pyspark.mllib.common import callMLlibFunc, JavaModelWrapper
from pyspark.mllib.linalg.distributed import RowMatrix

class SVD(JavaModelWrapper):
    """Wrapper around the SVD scala case class"""
    @property
    def U(self):
        """ Returns a RowMatrix whose columns are the left singular vectors of the SVD if computeU was set to be True."""
        u = self.call("U")
        if u is not None:
        return RowMatrix(u)

    @property
    def s(self):
        """Returns a DenseVector with singular values in descending order."""
        return self.call("s")

    @property
    def V(self):
        """ Returns a DenseMatrix whose columns are the right singular vectors of the SVD."""
        return self.call("V")

这定义了我们的SVD对象。现在,我们可以使用Java Wrapper来定义我们的computeSVD方法。

def computeSVD(row_matrix, k, computeU=False, rCond=1e-9):
    """
    Computes the singular value decomposition of the RowMatrix.
    The given row matrix A of dimension (m X n) is decomposed into U * s * V'T where
    * s: DenseVector consisting of square root of the eigenvalues (singular values) in descending order.
    * U: (m X k) (left singular vectors) is a RowMatrix whose columns are the eigenvectors of (A X A')
    * v: (n X k) (right singular vectors) is a Matrix whose columns are the eigenvectors of (A' X A)
    :param k: number of singular values to keep. We might return less than k if there are numerically zero singular values.
    :param computeU: Whether of not to compute U. If set to be True, then U is computed by A * V * sigma^-1
    :param rCond: the reciprocal condition number. All singular values smaller than rCond * sigma(0) are treated as zero, where sigma(0) is the largest singular value.
    :returns: SVD object
    """
    java_model = row_matrix._java_matrix_wrapper.call("computeSVD", int(k), computeU, float(rCond))
    return SVD(java_model)

现在,让我们用一个例子来应用它:
from pyspark.ml.feature import *
from pyspark.mllib.linalg import Vectors

data = [(Vectors.dense([0.0, 1.0, 0.0, 7.0, 0.0]),), (Vectors.dense([2.0, 0.0, 3.0, 4.0, 5.0]),), (Vectors.dense([4.0, 0.0, 0.0, 6.0, 7.0]),)]
df = sqlContext.createDataFrame(data,["features"])

pca_extracted = PCA(k=2, inputCol="features", outputCol="pca_features")

model = pca_extracted.fit(df)
features = model.transform(df) # this create a DataFrame with the regular features and pca_features

# We can now extract the pca_features to prepare our RowMatrix.
pca_features = features.select("pca_features").rdd.map(lambda row : row[0])
mat = RowMatrix(pca_features)

# Once the RowMatrix is ready we can compute our Singular Value Decomposition
svd = computeSVD(mat,2,True)
svd.s
# DenseVector([9.491, 4.6253])
svd.U.rows.collect()
# [DenseVector([0.1129, -0.909]), DenseVector([0.463, 0.4055]), DenseVector([0.8792, -0.0968])]
svd.V
# DenseMatrix(2, 2, [-0.8025, -0.5967, -0.5967, 0.8025], 0)

1
你有考虑过公关吗? - zero323
@zero323 是的,但如果我没记错的话,似乎已经有一个PR了。 - eliasah
1
@zero323 请看一下我基于这个问题所开启的问题单,以及相关的PR https://issues.apache.org/jira/browse/SPARK-11530 - desertnaut

5
您的问题最简单的答案是向模型输入一个单位矩阵。
identity_input = [(Vectors.dense([1.0, .0, 0.0, .0, 0.0]),),(Vectors.dense([.0, 1.0, .0, .0, .0]),), \
              (Vectors.dense([.0, 0.0, 1.0, .0, .0]),),(Vectors.dense([.0, 0.0, .0, 1.0, .0]),),
              (Vectors.dense([.0, 0.0, .0, .0, 1.0]),)]
df_identity = sqlContext.createDataFrame(identity_input,["features"])
identity_features = model.transform(df_identity)

这应该会给你主成分。

我认为eliasah的答案在Spark框架方面更好,因为desertnaut是使用numpy的函数而不是Spark的操作来解决问题。但是,eliasah的答案缺少数据归一化。因此,我会将以下行添加到eliasah的答案中:

from pyspark.ml.feature import StandardScaler
standardizer = StandardScaler(withMean=True, withStd=False,
                          inputCol='features',
                          outputCol='std_features')
model = standardizer.fit(df)
output = model.transform(df)
pca_features = output.select("std_features").rdd.map(lambda row : row[0])
mat = RowMatrix(pca_features)
svd = computeSVD(mat,5,True)

最终,svd.V 和 identity_features.select("pca_features").collect() 应该具有相同的值。
我已在这篇博客文章中总结了PCA及其在Spark和sklearn中的应用。

谢谢您在论文中没有提到我!我相信那是我回答中的代码。 - eliasah
我在评论中附上了您的代码链接作为引用,并且我不知道您的姓名。如果您希望我使用其他类型的致谢方式,请告诉我。此外,这不是一篇论文。这只是我和朋友准备的一篇说明文,旨在帮助人们理解事物。 - sergulaydore
尽管如此,当涉及到我的工作时,我宁愿被引用。如果我使用了你的工作,我也会这样做。这是社区合作规则和StackOverflow许可证的一部分。您还可以在我的SO个人资料中查看我的联系方式。我通常非常友好;-) - eliasah
好的。我会更新文档并重新分享。谢谢提醒。 - sergulaydore

2

在 Spark 2.2+ 中,您现在可以轻松地获取解释方差,如下所示:

from pyspark.ml.feature import VectorAssembler
assembler = VectorAssembler(inputCols=<columns of your original dataframe>, outputCol="features")
df = assembler.transform(<your original dataframe>).select("features")
from pyspark.ml.feature import PCA
pca = PCA(k=10, inputCol="features", outputCol="pcaFeatures")
model = pca.fit(df)
sum(model.explainedVariance)

抱歉对您的问题进行了投票,问题更多地是关于如何识别具有其解释方差的列,而不仅仅是提取解释方差本身;这并不是从问题中直接得出的,但我非常确定这就是目的。 - Amith Adiraju

网页内容由stack overflow 提供, 点击上面的
可以查看英文原文,
原文链接