[
更新: 从Spark 2.2开始,PySpark中已经同时提供了PCA和SVD - 请参见JIRA票
SPARK-6227以及
PCA &
PCAModel,适用于Spark ML 2.2;原始答案仍适用于旧版本的Spark。]
嗯,这似乎令人难以置信,但事实上目前(至少在Spark 1.5中)没有一种方法可以从PCA分解中提取这样的信息。但是,同样有许多类似的“投诉” - 例如,无法从
CrossValidatorModel
中提取最佳参数,请参见
here。
幸运的是,几个月前我参加了AMPlab(伯克利)和Databricks创建的'可扩展机器学习' MOOC课程,也就是Spark的创建者。在作业中,我们“手动”实现了完整的PCA流水线。我已经修改了当时的函数(请放心,我得到了满分:-),以便使用与您相同格式的数据框作为输入(而不是RDD),即包含数值特征的DenseVectors
行。
首先,我们需要定义一个中间函数estimatedCovariance
,如下所示:
import numpy as np
def estimateCovariance(df):
"""Compute the covariance matrix for a given dataframe.
Note:
The multi-dimensional covariance array should be calculated using outer products. Don't
forget to normalize the data by first subtracting the mean.
Args:
df: A Spark dataframe with a column named 'features', which (column) consists of DenseVectors.
Returns:
np.ndarray: A multi-dimensional array where the number of rows and columns both equal the
length of the arrays in the input dataframe.
"""
m = df.select(df['features']).map(lambda x: x[0]).mean()
dfZeroMean = df.select(df['features']).map(lambda x: x[0]).map(lambda x: x-m)
return dfZeroMean.map(lambda x: np.outer(x,x)).sum()/df.count()
然后,我们可以编写一个主要的
pca
函数,如下所示:
from numpy.linalg import eigh
def pca(df, k=2):
"""Computes the top `k` principal components, corresponding scores, and all eigenvalues.
Note:
All eigenvalues should be returned in sorted order (largest to smallest). `eigh` returns
each eigenvectors as a column. This function should also return eigenvectors as columns.
Args:
df: A Spark dataframe with a 'features' column, which (column) consists of DenseVectors.
k (int): The number of principal components to return.
Returns:
tuple of (np.ndarray, RDD of np.ndarray, np.ndarray): A tuple of (eigenvectors, `RDD` of
scores, eigenvalues). Eigenvectors is a multi-dimensional array where the number of
rows equals the length of the arrays in the input `RDD` and the number of columns equals
`k`. The `RDD` of scores has the same number of rows as `data` and consists of arrays
of length `k`. Eigenvalues is an array of length d (the number of features).
"""
cov = estimateCovariance(df)
col = cov.shape[1]
eigVals, eigVecs = eigh(cov)
inds = np.argsort(eigVals)
eigVecs = eigVecs.T[inds[-1:-(col+1):-1]]
components = eigVecs[0:k]
eigVals = eigVals[inds[-1:-(col+1):-1]]
score = df.select(df['features']).map(lambda x: x[0]).map(lambda x: np.dot(x, components.T) )
return components.T, score, eigVals
测试
首先,让我们看看使用现有方法的结果,使用Spark ML PCA 文档中的示例数据(将它们修改为所有DenseVectors
):
from pyspark.ml.feature import *
from pyspark.mllib.linalg import Vectors
data = [(Vectors.dense([0.0, 1.0, 0.0, 7.0, 0.0]),),
(Vectors.dense([2.0, 0.0, 3.0, 4.0, 5.0]),),
(Vectors.dense([4.0, 0.0, 0.0, 6.0, 7.0]),)]
df = sqlContext.createDataFrame(data,["features"])
pca_extracted = PCA(k=2, inputCol="features", outputCol="pca_features")
model = pca_extracted.fit(df)
model.transform(df).collect()
[Row(features=DenseVector([0.0, 1.0, 0.0, 7.0, 0.0]), pca_features=DenseVector([1.6486, -4.0133])),
Row(features=DenseVector([2.0, 0.0, 3.0, 4.0, 5.0]), pca_features=DenseVector([-4.6451, -1.1168])),
Row(features=DenseVector([4.0, 0.0, 0.0, 6.0, 7.0]), pca_features=DenseVector([-6.4289, -5.338]))]
然后,使用我们的方法:
comp, score, eigVals = pca(df)
score.collect()
[array([ 1.64857282, 4.0132827 ]),
array([-4.64510433, 1.11679727]),
array([-6.42888054, 5.33795143])]
让我强调一下,我们在定义的函数中
不使用任何
collect()
方法 -
score
是一个
RDD
,正如它应该的那样。
请注意,我们第二列的符号都与现有方法派生的符号相反;但这不是问题:根据Hastie和Tibshirani合著的(可免费下载)
统计学习导论,第382页:
每个主成分载荷向量都是唯一的,仅存在一个符号翻转。这意味着两个不同的软件包将产生相同的主成分载荷向量,尽管这些载荷向量的符号可能不同。符号可能不同,因为每个主成分载荷向量都指定了p维空间中的一个方向:翻转符号对方向没有影响,因为方向不会改变。[...] 同样,得分向量也是唯一的,仅存在一个符号翻转,因为Z的方差与−Z的方差相同。
最后,现在我们有了特征值,编写百分比解释方差的函数非常简单:
def varianceExplained(df, k=1):
"""Calculate the fraction of variance explained by the top `k` eigenvectors.
Args:
df: A Spark dataframe with a 'features' column, which (column) consists of DenseVectors.
k: The number of principal components to consider.
Returns:
float: A number between 0 and 1 representing the percentage of variance explained
by the top `k` eigenvectors.
"""
components, scores, eigenvalues = pca(df, k)
return sum(eigenvalues[0:k])/sum(eigenvalues)
varianceExplained(df,1)
作为测试,我们还检查了在k=5时(因为原始数据是5维的),我们示例数据中解释的方差是否为1.0:
varianceExplained(df,5)
[使用Spark 1.5.0和1.5.1进行开发和测试]