使用 withColumnRenamed 重命名多列

79
```

我想使用spark的withColumnRenamed函数更改两个列的名称。当然,我可以这样写:

```
data = sqlContext.createDataFrame([(1,2), (3,4)], ['x1', 'x2'])
data = (data
       .withColumnRenamed('x1','x3')
       .withColumnRenamed('x2', 'x4'))

但是我想一步完成这个操作(拥有新名称的列表/元组)。不幸的是,即使是这样:

data = data.withColumnRenamed(['x1', 'x2'], ['x3', 'x4'])

也不是这个:

data = data.withColumnRenamed(('x1', 'x2'), ('x3', 'x4'))

正在工作。用这种方式做可能吗?


4
已接受的答案很高效,但要注意其他建议多次调用withColumnRenamed的答案。出于此博客文章中概述的原因,应避免使用withColumnRenamed方法。请参阅我的答案以获取更多详细信息。 - Powers
12个回答

124

无法使用单个 withColumnRenamed 调用。

  • 您可以使用DataFrame.toDF 方法*

    data.toDF('x3', 'x4')
    
    或者
    new_names = ['x3', 'x4']
    data.toDF(*new_names)
    
    也可以使用简单的 select 重命名:
  • from pyspark.sql.functions import col
    
    mapping = dict(zip(['x1', 'x2'], ['x3', 'x4']))
    data.select([col(c).alias(mapping.get(c, c)) for c in data.columns])
    
    同样,在Scala中你可以:
    • 重命名所有列:

    val newNames = Seq("x3", "x4")
    
    data.toDF(newNames: _*)
    
  • 使用select重命名映射:

  • val  mapping = Map("x1" -> "x3", "x2" -> "x4")
    
    df.select(
      df.columns.map(c => df(c).alias(mapping.get(c).getOrElse(c))): _*
    )
    

    或者使用foldLeft+withColumnRenamed

    mapping.foldLeft(data){
      case (data, (oldName, newName)) => data.withColumnRenamed(oldName, newName) 
    }
    

* 不要将其与RDD.toDF混淆,后者不是可变参数函数,而是接受列名列表的函数。


在您的第三个示例中,data.select([col(c).alias(mapping.get(c, c)) for c in data.columns]):如果您使用方法链接,您将如何编写data.columns(因此类似于col但引用数据框架)? - corianne1234
在Spark(Scala/Python)中,使用df.select是正确的方法。请查看此链接:https://dev59.com/U1gQ5IYBdhLWcg3wMhBJ#62728542 - Krunal Patel

30

我也找不到一个简单的PySpark解决方案,所以我自己构建了一个类似于pandas的 df.rename(columns={'old_name_1':'new_name_1', 'old_name_2':'new_name_2'})

import pyspark.sql.functions as F

def rename_columns(df, columns):
    if isinstance(columns, dict):
        return df.select(*[F.col(col_name).alias(columns.get(col_name, col_name)) for col_name in df.columns])
    else:
        raise ValueError("'columns' should be a dict, like {'old_name_1':'new_name_1', 'old_name_2':'new_name_2'}")

所以你的解决方案将会像这样 data = rename_columns(data, {'x1': 'x3', 'x2': 'x4'})

如果你想链接你的方法调用,Spark 3.0 引入了 pyspark.sql.DataFrame.transform,你可以按照以下方式使用:

my_df.transform(lambda df: rename_columns(df, {'old_name_1':'new_name_1', 'old_name_2':'new_name_2'}))

它为我节省了一些代码,希望它也能帮助你。


2
在三年后更新此回复,提供了一种更高效的解决方案。在修复了使用.forColumnRenamed()进行循环的许多作业的性能问题之后,我意识到这一点。对于延迟表示歉意。 - proggeo

18

为什么你希望将其在一行中执行? 如果你打印执行计划,它实际上只是在单行中执行。

data = spark.createDataFrame([(1,2), (3,4)], ['x1', 'x2'])
data = (data
   .withColumnRenamed('x1','x3')
   .withColumnRenamed('x2', 'x4'))
data.explain()

输出

== Physical Plan ==
*(1) Project [x1#1548L AS x3#1552L, x2#1549L AS x4#1555L]
+- Scan ExistingRDD[x1#1548L,x2#1549L]

如果你想使用列表的元组来完成此操作,可以使用简单的map函数

data = spark.createDataFrame([(1,2), (3,4)], ['x1', 'x2'])
new_names = [("x1","x3"),("x2","x4")]
data = data.select(list(
       map(lambda old,new:F.col(old).alias(new),*zip(*new_names))
       ))

data.explain()

仍然有相同的计划

== Physical Plan ==
*(1) Project [x1#1650L AS x3#1654L, x2#1651L AS x4#1655L]
+- Scan ExistingRDD[x1#1650L,x2#1651L]

10
你也可以使用 字典(Dictionary) 来遍历你想要重命名的列。 示例
a_dict = {'sum_gb': 'sum_mbUsed', 'number_call': 'sum_call_date'}

for key, value in a_dict.items():
    df= df.withColumnRenamed(value,key)

9
这应该是可行的,如果您想使用相同的列名和前缀重命名多个列。
df.select([f.col(c).alias(PREFIX + c) for c in df.columns])

1
你编写了 for c in columns,而不是 df.columns。这导致我出错了,但如果它能够工作的话(用于方法链接),它会非常有用。你是如何让它工作的? - corianne1234
1
@corianne1234 如果要链接和更改列名,请使用 transform df.transform(lambda df2: df2.select([col(acol).alias(acol + '_tmp') for acol in df2.columns])) - pettinato

6

我在所有的pyspark程序中都使用了这个技巧:

import pyspark
def rename_sdf(df, mapper={}, **kwargs_mapper):
    ''' Rename column names of a dataframe
        mapper: a dict mapping from the old column names to new names
        Usage:
            df.rename({'old_col_name': 'new_col_name', 'old_col_name2': 'new_col_name2'})
            df.rename(old_col_name=new_col_name)
    '''
    for before, after in mapper.items():
        df = df.withColumnRenamed(before, after)
    for before, after in kwargs_mapper.items():
        df = df.withColumnRenamed(before, after)
    return df
pyspark.sql.dataframe.DataFrame.rename = rename_sdf

现在您可以像使用pandas一样轻松地重命名任何Spark DataFrame了!

df.rename({'old1':'new1', 'old2':'new2'})

5

自从 pyspark 3.4.0 版本以后,你可以使用 withColumnsRenamed() 方法一次性重命名多个列。它的输入是一个包含现有列名和对应期望列名的映射。

df = df.withColumnsRenamed({
    "x1": "x3",
    "x2": "x4"
})

该方法同时重命名两个列。请注意,如果当前数据框架模式中不存在某个列(例如"x1"),则不会抛出错误。相反,它将被简单地忽略。


3

zero323的回答非常高效。其他大多数答案应该避免使用。

以下是另一种高效的解决方案,利用了 quinn 库,非常适合生产代码库:

df = spark.createDataFrame([(1,2), (3,4)], ['x1', 'x2'])
def rename_col(s):
    mapping = {'x1': 'x3', 'x2': 'x4'}
    return mapping[s]
actual_df = df.transform(quinn.with_columns_renamed(rename_col))
actual_df.show()

这是输出的DataFrame:

+---+---+
| x3| x4|
+---+---+
|  1|  2|
|  3|  4|
+---+---+

让我们查看使用 actual_df.explain(True) 输出的逻辑计划并验证其是否高效:

== Parsed Logical Plan ==
'Project ['x1 AS x3#52, 'x2 AS x4#53]
+- LogicalRDD [x1#48L, x2#49L], false

== Analyzed Logical Plan ==
x3: bigint, x4: bigint
Project [x1#48L AS x3#52L, x2#49L AS x4#53L]
+- LogicalRDD [x1#48L, x2#49L], false

== Optimized Logical Plan ==
Project [x1#48L AS x3#52L, x2#49L AS x4#53L]
+- LogicalRDD [x1#48L, x2#49L], false

== Physical Plan ==
*(1) Project [x1#48L AS x3#52L, x2#49L AS x4#53L]

解析后的逻辑计划和物理计划基本相等,因此 Catalyst 并不需要进行大量的优化工作。

应避免多次调用withColumnRenamed,因为它会创建一个低效的解析计划,需要进行优化。

让我们来看一个不必要复杂的解析计划:

def rename_columns(df, columns):
    for old_name, new_name in columns.items():
        df = df.withColumnRenamed(old_name, new_name)
    return df

def rename_col(s):
    mapping = {'x1': 'x3', 'x2': 'x4'}
    return mapping[s]
actual_df = rename_columns(df, {'x1': 'x3', 'x2': 'x4'})
actual_df.explain(True)

== Parsed Logical Plan ==
Project [x3#52L, x2#49L AS x4#55L]
+- Project [x1#48L AS x3#52L, x2#49L]
   +- LogicalRDD [x1#48L, x2#49L], false

== Analyzed Logical Plan ==
x3: bigint, x4: bigint
Project [x3#52L, x2#49L AS x4#55L]
+- Project [x1#48L AS x3#52L, x2#49L]
   +- LogicalRDD [x1#48L, x2#49L], false

== Optimized Logical Plan ==
Project [x1#48L AS x3#52L, x2#49L AS x4#55L]
+- LogicalRDD [x1#48L, x2#49L], false

== Physical Plan ==
*(1) Project [x1#48L AS x3#52L, x2#49L AS x4#55L]

1

你应该使用这个函数:

def spark_rename_from_dict(df, rename_dict):
    newcols = [rename_dict.get(i,i) for i in df.columns]
    df = df.toDF(*newcols)

在这里,你的重命名字典是对 df.columns 子集的映射。这种方法被推荐,因为它不会创建多个数据帧。


0

最简单的方法如下:

解释:

  1. 使用df.columns获取pyspark dataframe中的所有列
  2. 创建一个循环步骤1中每个列的列表
  3. 仅对所需列执行此操作,例如:col("col1").alias("col1_x")
  4. *[list] 将拆包列表以用于pypsark的select语句

from pyspark.sql import functions as F (df .select(*[F.col(c).alias(f"{c}_x") for c in df.columns]) .toPandas().head() )

希望这可以帮助你


网页内容由stack overflow 提供, 点击上面的
可以查看英文原文,
原文链接