考虑一个Spark DataFrame,其中有几列。目标是在不将其转换为Pandas DataFrame的情况下执行groupBy操作。等价的Pandas groupBy代码如下:
def compute_metrics(x):
return pd.Series({
'a': x['a'].values[0],
'new_b': np.sum(x['b']),
'c': np.mean(x['c']),
'cnt': len(x)
})
data.groupby([
'col_1',
'col_2'
]).apply(compute_metrics).reset_index()
我打算使用PySpark编写这个程序。目前,我已经在 PySpark
中得到了以下代码:
gdf = df.groupBy([
'col_1',
'col_2'
]).agg({
'c': 'avg',
'b': 'sum'
}).withColumnRenamed('sum(b)', 'new_b')
然而,我不确定如何处理 'a': x['a'].values[0]
和 'cnt': len(x)
。我考虑使用 from pyspark.sql import functions
中的 collect_list
,但是它会返回 Column object is not Callable
。你有什么想法来完成上述转换吗?谢谢!
[更新] 是否对任何列执行 count
操作以获取 cnt
有意义?比如我这样做:
gdf = df.groupBy([
'col_1',
'col_2'
]).agg({
'c': 'avg',
'b': 'sum',
'some_column': 'count'
}).withColumnRenamed('sum(b)', 'new_b')
.withColumnRenamed('count(some_column)', 'cnt')
'a': x['a'].values[0]
。 - Kevin Ghaboosivalues[0]
是什么@KevinGhaboosi?它是按分组排序后的第一个元素吗? - titipataapply()
的方法,可以将整个子数据框传递并通过另一个函数进行操作? - kuanb