在对PySpark DataFrame进行分组后如何使用describe函数?

9

我希望找到将describe函数应用于分组数据框的最清晰的方法(这个问题也可以扩展到将任何DF函数应用于分组DF)。

我尝试了分组聚合的pandas UDF,但没有成功。总有一种方法是通过在agg函数中传递每个统计量来完成,但这不是正确的方法。

如果我们有一个示例数据框:

df = spark.createDataFrame(
    [(1, 1.0), (1, 2.0), (2, 3.0), (2, 5.0), (2, 10.0)],
    ("id", "v"))

我的想法是做一些类似于Pandas的事情:

df.groupby("id").describe()

将得到以下结果:

                                                   v
    count mean     std    min   25%    50%  75%   max
id                              
1   2.0   1.5   0.707107  1.0   1.25   1.5  1.75  2.0
2   3.0   6.0   3.605551  3.0   4.00   5.0  7.50  10.0

谢谢。

没有绕过将每个统计量传递给“agg”函数的方法。这是正确(也是唯一)的方法。 - pault
val df = myDataFrame.describe(); df: org.apache.spark.sql.DataFrame = [summary: string, count: string]; df.show() - Dinesh Kumar
3个回答

7

试试这个:

df = (df
    .groupby("id")
    .agg(F.count('v').alias('count'),
         F.mean('v').alias('mean'),
         F.stddev('v').alias('std'),
         F.min('v').alias('min'),
         F.expr('percentile(v, array(0.25))')[0].alias('%25'),
         F.expr('percentile(v, array(0.5))')[0].alias('%50'),
         F.expr('percentile(v, array(0.75))')[0].alias('%75'),
         F.max('v').alias('max')))
df.show()

输出:

+---+-----+----+------------------+---+----+---+----+----+
| id|count|mean|               std|min| %25|%50| %75| max|
+---+-----+----+------------------+---+----+---+----+----+
|  1|    2| 1.5|0.7071067811865476|1.0|1.25|1.5|1.75| 2.0|
|  2|    3| 6.0| 3.605551275463989|3.0| 4.0|5.0| 7.5|10.0|
+---+-----+----+------------------+---+----+---+----+----+

1
这个可以运行,但我想有一种更简洁的方法来实现它。感谢您的帮助。 - Favio Vázquez

2
如果您有一个实用程序函数模块,可以将以下内容放入其中,然后在一行代码后调用它。
import pyspark.sql.functions as F

def groupby_apply_describe(df, groupby_col, stat_col):
    """From a grouby df object provide the stats
    of describe for each key in the groupby object.

    Parameters
    ----------
    df : spark dataframe groupby object
    col : column to compute statistics on
    
    """
    output = df.groupby(groupby_col).agg(
        F.count(stat_col).alias("count"),
        F.mean(stat_col).alias("mean"),
        F.stddev(stat_col).alias("std"),
        F.min(stat_col).alias("min"),
        F.expr(f"percentile({stat_col}, array(0.25))")[0].alias("%25"),
        F.expr(f"percentile({stat_col}, array(0.5))")[0].alias("%50"),
        F.expr(f"percentile({stat_col}, array(0.75))")[0].alias("%75"),
        F.max(stat_col).alias("max"),
    )
    print(output.orderBy(groupby_col).show())
    return output

在您的情况下,您可以调用groupby_apply_describe(df, 'id', 'v')。 输出应与您的要求匹配。

1

描述多列...

受之前的答案启发,但在spark/3.0.1中进行了测试。

import itertools as it
import pyspark.sql.functions as F
from functools import reduce

group_column = 'id'
metric_columns = ['v','v1','v2']

# You will have a dataframe with df variable

def spark_describe(group_col, stat_col):
    return df.groupby(group_col).agg(
        F.count(stat_col).alias(f"{stat_col}_count"),
        F.mean(stat_col).alias(f"{stat_col}_mean"),
        F.stddev(stat_col).alias(f"{stat_col}_std"),
        F.min(stat_col).alias(f"{stat_col}_min"),
        F.max(stat_col).alias(f"{stat_col}_max"),
        F.expr(f"percentile({stat_col}, array(0.25))")[0].alias(f"{stat_col}_25pct"),
        F.expr(f"percentile({stat_col}, array(0.5))")[0].alias(f"{stat_col}_50pct"),
        F.expr(f"percentile({stat_col}, array(0.75))")[0].alias(f"{stat_col}_75pct"),   
    )

_join = lambda a,b: a.join(b, group_column, 'inner')
dff = reduce(_join, list(map(lambda x: spark_describe(*x), zip(it.repeat(group_column, len(metric_columns)), metric_columns))))


网页内容由stack overflow 提供, 点击上面的
可以查看英文原文,
原文链接