在Pyspark DataFrame中查找1万列的均值和相关性

4
我有一个包含1万列和7000万行的DF。我想计算这1万列的平均值和相关性。我尝试了以下代码,但由于代码大小达到了64K的限制而无法执行(https://issues.apache.org/jira/browse/SPARK-16845)。
数据:
region dept week sal val1  val2  val3 ... val10000   
 US    CS   1     1    2    1     1   ...  2 
 US    CS   2     1.5  2    3     1   ...  2
 US    CS   3     1    2    2     2.1      2
 US    ELE  1     1.1  2    2     2.1      2
 US    ELE  2     2.1  2    2     2.1      2
 US    ELE  3     1    2    1     2   .... 2
 UE    CS   1     2    2    1     2   .... 2

代码:

aggList =  [func.mean(col) for col in df.columns]  #exclude keys
df2= df.groupBy('region', 'dept').agg(*aggList)

代码 2

aggList =  [func.corr('sal', col).alias(col) for col in df.columns]  #exclude keys
df2  = df.groupBy('region', 'dept', 'week').agg(*aggList)

这个失败了。有没有其他的方法来解决这个错误?有人尝试过10K列的DF吗?有没有性能改进的建议?


3
10k列之间的相关性?这是一种成对测量方法,因此您预计会得到大约10^8个相关系数? - Martin Milichovsky
@Martin Milichovsky,抱歉我在相关函数中错过了第二列。我正在尝试找到列sal和val1、sal和value2……sal和value10000之间的相关性。如果您有任何问题,请告诉我。 - Harish
@MartinMilichovsky 我认为Harish只是想将每一列与“sal”相关联。因此没有组合爆炸。 - Joachim Rosskopf
1个回答

1
我们也遇到了64KB问题,但是在where子句中,这个问题已经被归档到另一个错误报告中。我们使用的解决方法很简单,就是将操作/转换分几步进行。
在你的情况下,这意味着你不要一步完成所有的聚合操作。相反,应该在外部操作中循环处理相关列。
  • 使用select创建一个临时数据框,该数据框仅包含您需要进行操作的列。
  • 像之前一样使用groupByagg,但不是为了聚合列表,而是只对一个(或两个,您可以将meancorr组合起来)。
  • 在接收到所有临时数据帧的引用后,使用withColumn将临时数据帧中的聚合列附加到结果df中。

由于Spark DAG的惰性评估,这当然比一次操作慢。 但它应该在一个运行中评估整个分析。


1
我有同样的计划...但问题在于我必须找到第1列与其余所有列之间的相关性。这意味着我必须进行10,000次相关性计算。因此,我觉得将其存储在10,000个临时数据框中会带来太多开销。还有一件事是我的值列是动态的。 - Harish
问题是,如果对于许多聚合和临时数据框而言的DAG看起来如此不同。如果你考虑底层的RDDs,那么它并没有太大的区别,因为洗牌的数量等都是相同的。 - Joachim Rosskopf
对于df中的每一列: df2 = df.groupBy('region', 'dept') .agg(func.mean(func.col(col)).alias(col + '_m')) df2 = df2.withColumn(col, func.col(col) - func.col(col + '_m')).drop(col + '_m') - Harish
上面的代码是这样的吗?这会循环10,000次吗?如果不是,请纠正我。 - Harish

网页内容由stack overflow 提供, 点击上面的
可以查看英文原文,
原文链接