PySpark计算相关性

16

我想使用 pyspark.mllib.stat.Statistics.corr 函数来计算 pyspark.sql.dataframe.DataFrame 对象中两列之间的相关性。 corr 函数希望接受一个 Vectors 对象的 rdd。如何将 df['some_name'] 列转换为 Vectors.dense 对象的 rdd

4个回答

30

没有必要这样做。对于数值型数据,您可以直接使用 DataFrameStatFunctions.corr 计算相关性:

df1 = sc.parallelize([(0.0, 1.0), (1.0, 0.0)]).toDF(["x", "y"])
df1.stat.corr("x", "y")
# -1.0

否则,您可以使用VectorAssembler

from pyspark.ml.feature import VectorAssembler

assembler = VectorAssembler(inputCols=df.columns, outputCol="features")
assembler.transform(df).select("features").flatMap(lambda x: x)

3
仅支持皮尔逊相关性。 - VJune
尝试使用VectorAssembler时,我首先遇到了(“DataFrame”对象没有“flatMap”属性)的问题。然后在select('features')后添加.rdd。现在在收集返回的RDD时出现(Py4JJavaError:调用org.apache.spark.api.python.PythonRDD.collectAndServe时发生错误。 :org.apache.spark.SparkException:由于阶段失败而中止作业:第48.0个阶段中的任务0失败1次,最近一次失败), - Vaibhav
这里找到的解决方案有所帮助:https://dev59.com/wVQK5IYBdhLWcg3wdPiI - Vaibhav
这将返回一个没有列名的矩阵。有没有办法分别指定列名?另外,我们能否将其转换为不同的形式以便导出?我知道如果转换为pandas DF,则可以提供列名,但转换速度较慢 :/ - gamer

6

df.stat.corr("column1","column2")


3
from pyspark.ml.stat import Correlation
from pyspark.ml.linalg import DenseMatrix, Vectors
from pyspark.ml.feature import VectorAssembler
from pyspark.sql.functions import *

# Loading Data with more than 50 features
newdata = spark.read.csv("sample*.csv",inferSchema=True,header=True)

assembler = VectorAssembler(inputCols=newdata.columns, 
outputCol="features",handleInvalid='keep')
df = assembler.transform(newdata).select("features")

# correlation will be in Dense Matrix
correlation = Correlation.corr(df,"features","pearson").collect()[0][0]

# To convert Dense Matrix into DataFrame
rows = correlation.toArray().tolist()
df = spark.createDataFrame(rows,newdata.columns)

1
好的,我弄明白了:
v1 = df.flatMap(lambda x: Vectors.dense(x[col_idx_1]))
v2 = df.flatMap(lambda x: Vectors.dense(x[col_idx_2])) 

网页内容由stack overflow 提供, 点击上面的
可以查看英文原文,
原文链接