从Pandas的groupBy到PySpark的groupBy

4
考虑一个Spark DataFrame,其中有几列。目标是在不将其转换为Pandas DataFrame的情况下执行groupBy操作。等价的Pandas groupBy代码如下:
def compute_metrics(x):
    return pd.Series({
        'a': x['a'].values[0],
        'new_b': np.sum(x['b']),
        'c': np.mean(x['c']),
        'cnt': len(x)
    })

data.groupby([
    'col_1',
    'col_2'
]).apply(compute_metrics).reset_index()

我打算使用PySpark编写这个程序。目前,我已经在 PySpark 中得到了以下代码:

gdf = df.groupBy([
    'col_1',
    'col_2'
]).agg({
    'c': 'avg',
    'b': 'sum'
}).withColumnRenamed('sum(b)', 'new_b')

然而,我不确定如何处理 'a': x['a'].values[0]'cnt': len(x)。我考虑使用 from pyspark.sql import functions 中的 collect_list,但是它会返回 Column object is not Callable。你有什么想法来完成上述转换吗?谢谢!

[更新] 是否对任何列执行 count 操作以获取 cnt 有意义?比如我这样做:

gdf = df.groupBy([
    'col_1',
    'col_2'
]).agg({
    'c': 'avg',
    'b': 'sum',
    'some_column': 'count'
}).withColumnRenamed('sum(b)', 'new_b')
  .withColumnRenamed('count(some_column)', 'cnt')
1个回答

6
我使用 PySpark 函数 sumavgcountfirst 创建了这个玩具解决方案。请注意,此解决方案在 Spark 2.1 中使用。希望这可以帮到您!
from pyspark.sql.functions import sum, avg, count, first

# create toy example dataframe with column 'A', 'B' and 'C'
ls = [['a', 'b',3], ['a', 'b', 4], ['a', 'c', 3], ['b', 'b', 5]]
df = spark.createDataFrame(ls, schema=['A', 'B', 'C'])

# group by column 'A' and 'B' then performing some function here
group_df = df.groupby(['A', 'B'])
df_grouped = group_df.agg(sum("C").alias("sumC"), 
                          avg("C").alias("avgC"), 
                          count("C").alias("countC"), 
                          first("C").alias("firstC"))
df_grouped.show() # print out the spark dataframe

谢谢!你的解决方案比我的丑陋代码更优雅。我仍然在思考该怎么处理 'a': x['a'].values[0] - Kevin Ghaboosi
在这种情况下,values[0]是什么@KevinGhaboosi?它是按分组排序后的第一个元素吗? - titipata
这里需要注意的是,对于更复杂的函数,您可以编写用户定义函数(UDF),以便在分组后应用。 - titipata
感谢@titipat的评论。是的,那是正确的! - Kevin Ghaboosi
有没有一种类似于pandas apply()的方法,可以将整个子数据框传递并通过另一个函数进行操作? - kuanb

网页内容由stack overflow 提供, 点击上面的
可以查看英文原文,
原文链接