如何在PySpark中更改数据框列名?

330

我来自pandas背景,习惯于从CSV文件中读取数据到数据框并通过简单的命令将列名更改为有用的内容:

df.columns = new_column_name_list

然而,使用sqlContext创建的PySpark数据框不适用于相同的方法。 我能想到的唯一解决办法如下:
df = sqlContext.read.format("com.databricks.spark.csv").options(header='false', inferschema='true', delimiter='\t').load("data.txt")
oldSchema = df.schema
for i,k in enumerate(oldSchema.fields):
  k.name = new_column_name_list[i]
df = sqlContext.read.format("com.databricks.spark.csv").options(header='false', delimiter='\t').load("data.txt", schema=oldSchema)

这基本上是定义变量两次,首先推断模式,然后重命名列名,然后使用更新后的模式再次加载数据框。

是否有更好、更有效的方法来做到这一点,就像我们在Pandas中所做的那样?

我的Spark版本是1.5.0

25个回答

504

有许多方法可以做到这一点:

 data = sqlContext.createDataFrame([("Alberto", 2), ("Dakota", 2)], 
                                   ["Name", "askdaosdka"])
 data.show()
 data.printSchema()

 # Output
 #+-------+----------+
 #|   Name|askdaosdka|
 #+-------+----------+
 #|Alberto|         2|
 #| Dakota|         2|
 #+-------+----------+

 #root
 # |-- Name: string (nullable = true)
 # |-- askdaosdka: long (nullable = true)

 df = data.selectExpr("Name as name", "askdaosdka as age")
 df.show()
 df.printSchema()

 # Output
 #+-------+---+
 #|   name|age|
 #+-------+---+
 #|Alberto|  2|
 #| Dakota|  2|
 #+-------+---+

 #root
 # |-- name: string (nullable = true)
 # |-- age: long (nullable = true)
  • 选项2. 使用 withColumnRenamed,注意该方法允许您“覆盖”相同的列。对于Python3,请使用range替换xrange

  •  from functools import reduce
    
     oldColumns = data.schema.names
     newColumns = ["name", "age"]
    
     df = reduce(lambda data, idx: data.withColumnRenamed(oldColumns[idx], newColumns[idx]), xrange(len(oldColumns)), data)
     df.printSchema()
     df.show()
    
  • 选项3. 使用alias,在Scala中您也可以使用as

  •  from pyspark.sql.functions import col
    
     data = data.select(col("Name").alias("name"), col("askdaosdka").alias("age"))
     data.show()
    
     # Output
     #+-------+---+
     #|   name|age|
     #+-------+---+
     #|Alberto|  2|
     #| Dakota|  2|
     #+-------+---+
    
  • 选项 4. 使用 sqlContext.sql,该函数允许你在已经注册为表的DataFrames上使用 SQL 查询。

  •  sqlContext.registerDataFrameAsTable(data, "myTable")
     df2 = sqlContext.sql("SELECT Name AS name, askdaosdka as age from myTable")
    
     df2.show()
    
     # Output
     #+-------+---+
     #|   name|age|
     #+-------+---+
     #|Alberto|  2|
     #| Dakota|  2|
     #+-------+---+
    

    3
    我用了一个 for 循环和 withColumnRenamed 来完成,但是你的 reduce 选项非常好 :) - Felipe Gerard
    1
    既然在Spark中只有在DF上调用操作时才会执行任何操作,因此这只是不太优雅的代码...最终生成的DF完全相同! - Felipe Gerard
    2
    @FelipeGerard 请查看此帖子,如果你有很多列,可能会发生不好的事情。 - Alberto Bonsanto
    3
    @NuValue,你应该先运行from functools import reduce - joaofbsm
    2
    在 PySpark 2.4 中,使用 Python 3.6.8,其中唯一有效的方法是 df.select('id').withColumnRenamed('id', 'new_id')spark.sql("SELECT id AS new_id FROM df") - rjurney
    显示剩余13条评论

    296
    df = df.withColumnRenamed("colName", "newColName")\
           .withColumnRenamed("colName2", "newColName2")
    

    使用此方法的优点:当你有一个很长的列名列表但只想更改其中几个列名时,这种方法非常方便。在加入具有重复列名的表格时非常有用。


    3
    有没有一种方案可以让所有其他列保持不变?使用这种方法和其他方法,只有明确命名的列会保留下来(所有其他列都被删除)。 - Quetzalcoatl
    6
    +1 对我很有效,我只是编辑了指定的列而不改变其他的,也没有删除任何列。 - mnis.p
    5
    @Quetzalcoatl 这个命令似乎只是改变指定的列而保留所有其他列。因此,这是一个非常好的命令,可以重命名可能有许多列名中的其中一列。 - user989762
    1
    @user989762:同意;我最初的理解在这个问题上是错误的...! - Quetzalcoatl
    1
    这对于重命名几列非常有用。请参见我的答案,以获取可以编程重命名列的解决方案。假设您有200个列,并且您想要重命名其中50个具有某种类型的列名称,并保留其他150个不变。在这种情况下,您不希望手动运行withColumnRenamed(运行那么多次withColumnRenamed也是低效的,如此处所述)。 - Powers
    @Powers:我看了你的文章,但其中包含将所有字符串类型转换为整数的命令。你能否在这里写下如何使用你的Medium文章中的代码来重命名,比如只重命名上面答案中提到的两列? - Sheldore

    120
    如果你想改变所有列的名称,请尝试使用 df.toDF(*cols)

    14
    根据问题的原始作者所说,这个解决方案最接近于将df.columns设置为new_column_name_list,无论是在简洁性还是执行方面。 - Quetzalcoatl
    3
    我认为这应该被选为最佳答案。 - HanaKaze
    1
    对我来说,我是从pandas dataframe中获取标题名称的,所以我只需使用df = df.toDF(*my_pandas_df.columns) - Nic Scozzaro
    3
    这个回答让我感到困惑。难道旧列名不应该映射到新的列名吗?是通过将cols作为新的列名,并假设cols中的名称顺序对应于数据框中的列顺序来完成的吗? - rbatt
    如果您想传递列列表,则这显然是最佳答案。 - poiter
    1
    @rbatt 使用 df.selectpyspark.sql.functions col-method 结合使用是一种可靠的方法,因为它保持了应用的映射/别名,从而在重命名操作后保持了顺序/架构。请查看评论以获取代码片段:https://dev59.com/U1gQ5IYBdhLWcg3wMhBJ#62728542 - Krunal Patel

    79
    如果你想对所有列名应用简单的转换,这段代码可以实现:(我将所有空格替换为下划线)
    new_column_name_list= list(map(lambda x: x.replace(" ", "_"), df.columns))
    
    df = df.toDF(*new_column_name_list)
    

    感谢 @user8117731 提供的 toDf 技巧。


    2
    这段代码生成一个简单的物理计划,易于Catalyst优化。它还很优雅。+1 - Powers

    21

    df.withColumnRenamed('age', 'age2')


    2
    Pankaj Kumar的回答和Alberto Bonsanto的回答(分别于2016年和2015年发布),已经建议使用withColumnRenamed - Andrew Myers
    谢谢,是的,但有几种不同的语法,也许我们应该将它们收集到一个更正式的答案中?data.withColumnRenamed(oldColumns[idx], newColumns[idx]) 与 data.withColumnRenamed(columnname, new columnname)我认为这取决于您使用的Pyspark版本。 - Sahan Jayasumana
    3
    这不是一种不同的语法。唯一的区别是您没有将列名存储在数组中。 - Ed Bordin

    20

    如果您想重命名单个列并保持其余部分不变:

    from pyspark.sql.functions import col
    new_df = old_df.select(*[col(s).alias(new_name) if s == column_to_change else s for s in old_df.columns])
    

    18

    这是我所采用的方法:

    创建 PySpark 会话:

    import pyspark
    from pyspark.sql import SparkSession
    spark = SparkSession.builder.appName('changeColNames').getOrCreate()
    

    创建数据框:

    df = spark.createDataFrame(data = [('Bob', 5.62,'juice'),  ('Sue',0.85,'milk')], schema = ["Name", "Amount","Item"])
    

    使用列名查看数据框:

    df.show()
    +----+------+-----+
    |Name|Amount| Item|
    +----+------+-----+
    | Bob|  5.62|juice|
    | Sue|  0.85| milk|
    +----+------+-----+
    
    创建一个带有新列名称的列表:
    newcolnames = ['NameNew','AmountNew','ItemNew']
    

    更改数据框的列名:

    for c,n in zip(df.columns,newcolnames):
        df=df.withColumnRenamed(c,n)
    

    使用新列名查看数据框:

    df.show()
    +-------+---------+-------+
    |NameNew|AmountNew|ItemNew|
    +-------+---------+-------+
    |    Bob|     5.62|  juice|
    |    Sue|     0.85|   milk|
    +-------+---------+-------+
    

    14

    我创建了一个易于使用的函数,用于为Pyspark数据框重命名多个列,如果有人需要使用它:

    def renameCols(df, old_columns, new_columns):
        for old_col,new_col in zip(old_columns,new_columns):
            df = df.withColumnRenamed(old_col,new_col)
        return df
    
    old_columns = ['old_name1','old_name2']
    new_columns = ['new_name1', 'new_name2']
    df_renamed = renameCols(df, old_columns, new_columns)
    

    注意,两个列表必须具有相同的长度。


    1
    做得不错。但对于我所需的有点过头了。你可以直接传递df,因为“old_columns”与“df.columns”相同。 - Darth Egregious

    11

    另一种重命名单个列的方法(使用import pyspark.sql.functions as F):

    df = df.select( '*', F.col('count').alias('new_count') ).drop('count')
    

    11

    方法1:

    df = df.withColumnRenamed("old_column_name", "new_column_name")
    

    方法2: 如果您想进行一些计算并重命名新值

    df = df.withColumn("old_column_name", F.when(F.col("old_column_name") > 1, F.lit(1)).otherwise(F.col("old_column_name"))
    df = df.drop("new_column_name", "old_column_name")
    

    1
    有很多类似的答案,所以不需要再发布重复的答案。 - astentx
    5
    在withColumnRenamed中,第一个参数是旧的列名。您的Method 1是错误的。 - Sheldore

    网页内容由stack overflow 提供, 点击上面的
    可以查看英文原文,
    原文链接