使用 PySpark DataFrame 聚合函数重命名列

111

我正在使用PySpark DataFrames分析一些数据。假设我有一个DataFrame df需要进行聚合:

(df.groupBy("group")
   .agg({"money":"sum"})
   .show(100)
)

这将会给我:

group                SUM(money#2L)
A                    137461285853
B                    172185566943
C                    271179590646

聚合函数可以正常工作,但我不喜欢新的列名SUM(money#2L)。是否有一种方法可以从.agg 方法中将此列重命名为更易读的名称?也许更类似于在 dplyr 中做的事情:

df %>% group_by(group) %>% summarise(sum_money = sum(money))
11个回答

201

尽管我仍然更喜欢 dplyr 的语法,但这段代码片段也可以:

import pyspark.sql.functions as sf

(df.groupBy("group")
   .agg(sf.sum('money').alias('money'))
   .show(100))

它会变得啰嗦。


29
如果其他人复制粘贴了这个“alias”部分但是没有生效,请注意你的括号。alias('string')存在于agg内部,否则你将别名化整个DataFrame而不仅仅是列。 - matrixanomaly

89

withColumnRenamed方法可以达到重命名列的目的。下面是pyspark.sql API链接。

df.groupBy("group")\
  .agg({"money":"sum"})\
  .withColumnRenamed("SUM(money)", "money")
  .show(100)

3
“alias”是一个好的指针,但这才是正确的答案——有时使用agg内的字典是有好处的,而且似乎唯一的“别名”聚合列的方法就是重命名它。 - Hendrik F
请使用 withColumnRenamed 而不是 alias。为什么?因为“分而治之”比过度加载大脑更有效。谢谢! - Curious Watcher

8
我为此编写了一个小的辅助函数,可以帮助一些人。
import re

from functools import partial

def rename_cols(agg_df, ignore_first_n=1):
    """changes the default spark aggregate names `avg(colname)` 
    to something a bit more useful. Pass an aggregated dataframe
    and the number of aggregation columns to ignore.
    """
    delimiters = "(", ")"
    split_pattern = '|'.join(map(re.escape, delimiters))
    splitter = partial(re.split, split_pattern)
    split_agg = lambda x: '_'.join(splitter(x))[0:-ignore_first_n]
    renamed = map(split_agg, agg_df.columns[ignore_first_n:])
    renamed = zip(agg_df.columns[ignore_first_n:], renamed)
    for old, new in renamed:
        agg_df = agg_df.withColumnRenamed(old, new)
    return agg_df

一个例子:
gb = (df.selectExpr("id", "rank", "rate", "price", "clicks")
 .groupby("id")
 .agg({"rank": "mean",
       "*": "count",
       "rate": "mean", 
       "price": "mean", 
       "clicks": "mean", 
       })
)

>>> gb.columns
['id',
 'avg(rate)',
 'count(1)',
 'avg(price)',
 'avg(rank)',
 'avg(clicks)']

>>> rename_cols(gb).columns
['id',
 'avg_rate',
 'count_1',
 'avg_price',
 'avg_rank',
 'avg_clicks']

至少做一些事情来减少人们的打字量。


2
非常有用且及时。我正想问同样的问题。如果您可以在agg字典中指定一个新的列名(在Spark中),那就太好了。 - Evan Zamir
@EvanZamir 谢谢!我可能会尝试在Spark中为此提交一个简单的PR。 - binaryaaron
你可以通过 df = df.toDF(*newColumnNames) 来简单地重命名DataFrame(df)的所有列名,其中 newColumnNames 包含了所有新的列名 :) - Markus
嗨,我进行了以下修改,因为使用 ignore_first_n=2 会截断列的最后一个字母:split_agg = lambda x: '_'.join(splitter(x))[:-1] - massigarg

7

很简单:

 val maxVideoLenPerItemDf = requiredItemsFiltered.groupBy("itemId").agg(max("playBackDuration").as("customVideoLength"))
maxVideoLenPerItemDf.show()

在聚合中使用.as来命名新创建的行。


6
从 PySpark 2.4.0 开始,.as('new_name') 应该替换为 .alias('new_name') - RyanLeiTaiwan

6

.alias.withColumnRenamed 都可以在硬编码列名的情况下使用。如果您需要一个编程解决方案,例如对所有剩余列进行聚合并提供更友好的名称,则此方法提供了一个良好的起点:

grouping_column = 'group'
cols = [F.sum(F.col(x)).alias(x) for x in df.columns if x != grouping_column]
(
    df
    .groupBy(grouping_column)
    .agg(
        *cols
    )
)

这段代码太棒了!这就是正确的做法。应该得到更多的赞。 - NatalieL

4
df = df.groupby('Device_ID').agg(aggregate_methods)
for column in df.columns:
    start_index = column.find('(')
    end_index = column.find(')')
    if (start_index and end_index):
        df = df.withColumnRenamed(column, column[start_index+1:end_index])

上述代码可以去除括号外的任何内容。例如,"sum(foo)"将被重命名为"foo"。

只需注意没有括号的列,它们将被完全删除,例如groupby变量。可以添加if/continue检查。我有一个单一的变量作为我的groupby变量,所以只需检查它即可。 - statHacker

4
import findspark
findspark.init()

from pyspark.sql import SparkSession
from pyspark.sql.functions import *
from pyspark.sql.types import *

spark = SparkSession.builder.appName('test').getOrCreate()
data = [(1, "siva", 100), (2, "siva2", 200),(3, "siva3", 300),(4, "siva4", 400),(5, "siva5", 500)]
schema = ['id', 'name', 'sallary']

df = spark.createDataFrame(data, schema=schema)
df.show()
+---+-----+-------+
| id| name|sallary|
+---+-----+-------+
|  1| siva|    100|
|  2|siva2|    200|
|  3|siva3|    300|
|  4|siva4|    400|
|  5|siva5|    500|
+---+-----+-------+


**df.agg({"sallary": "max"}).withColumnRenamed('max(sallary)', 'max').show()**
+---+
|max|
+---+
|500|
+---+

3

虽然之前给出的答案不错,但我认为它们缺乏一种处理.agg()中词典用法的简洁方式。

如果你想使用一个词典,而且这个词典可能是动态生成的,因为你有数百列数据,你可以使用以下方法而无需处理大量的代码行:

# Your dictionary-version of using the .agg()-function
# Note: The provided logic could actually also be applied to a non-dictionary approach
df = df.groupBy("group")\
   .agg({
          "money":"sum"
        , "...":  "..."
    })

# Now do the renaming
newColumnNames = ["group", "money", "..."] # Provide the names for ALL columns of the new df
df = df.toDF(*newColumnNames)              # Do the renaming

当然,“newColumnNames”列表也可以动态生成。例如,如果您只向聚合中添加列到您的“df”,则可以预先存储“newColumnNames = df.columns”,然后只需添加额外的名称即可。
无论如何,请注意,“newColumnNames”必须包含数据帧的所有列名,而不仅仅是要重命名的列名(因为由于Spark的不可变RDD,“.toDF()”创建了一个新的数据帧)!

0

另一个快速的一行代码添加到混合中:

df.groupBy('group')
  .agg({'money':'sum',
        'moreMoney':'sum',
        'evenMoreMoney':'sum'
        })
    .select(*(col(i).alias(i.replace("(",'_').replace(')','')) for i in df.columns))

只需将别名函数更改为您想要命名的任何内容即可。以上代码会生成sum_money、sum_moreMoney,因为我喜欢在变量名中看到运算符。


0
【特殊情况】
如果我们想要将聚合列的名称重命名为被求和的列的名称(即:sum(column1) --> column1),我们可以这样做:
import pyspark.sql.functions as F

groupby_keys = ["categorical_column_1", "categorical_column_2"]
numerical_columns = ["numerical_column_1", "numerical_column_2"]

aggregation_computations = [F.sum(col).alias(col) for col in numerical_columns]
df = df.groupby(groupby_keys).agg(*aggregation_computations)

df.show()

+----------------------+----------------------+--------------------+--------------------+
| categorical_column_1 | categorical_column_2 | numerical_column_1 | numerical_column_2 |
+----------------------+----------------------+--------------------+--------------------+
|     category_1_1     |     category_2_1     |           1        |          1.0       |
|     category_1_2     |     category_2_1     |           2        |          2.0       |
|     category_1_1     |     category_2_2     |           3        |          3.0       |
|     category_1_2     |     category_2_2     |           4        |          4.0       |
+----------------------+----------------------+--------------------+--------------------+

网页内容由stack overflow 提供, 点击上面的
可以查看英文原文,
原文链接