在PySpark数据框中添加列总和作为新列

49
我正在使用PySpark,有一个包含多列数字的Spark数据框。我想添加一列,该列是所有其他列的总和。
假设我的数据框有列"a"、"b"和"c"。我知道可以这样做:
df.withColumn('total_col', df.a + df.b + df.c)
问题在于我不想逐个输入每一列并将它们相加,特别是如果有很多列的话。我希望能够自动完成这个过程或通过指定一个要相加的列名列表来完成。有没有其他方法可以做到这一点?

这在RDDs中比数据框要容易得多,例如,如果数据是表示行的数组,则可以执行RDD.map(lambda data: (data, sum(data)))。使用Spark数据框更困难的主要原因是弄清楚在withColumn中允许作为列表达式的内容。它似乎没有很好的文档记录。 - Paul
这个好像也不行(PySpark 1.6.3):`dftest.withColumn("times", sum((dftest[c] > 2).cast("int") for c in dftest.columns[1:]))` 然后, `dftest.select('a', 'b', 'c', 'd').rdd.map(lambda x: (x, sum(x))).take(2)`似乎无法正常工作。 - Abhinav Sood
8个回答

57

这并不明显。在Spark数据帧API中,我没有看到定义列的基于行求和的方式。

版本2

这可以通过一个相当简单的方式完成:

newdf = df.withColumn('total', sum(df[col] for col in df.columns))

df.columns是由pyspark提供的一个字符串列表,其中包含Spark数据框中所有列的列名。要进行其他求和操作,您可以提供任何其他列名的列表。

我没有尝试这个作为我的第一个解决方案,因为我不确定它会表现如何。但是它能够工作。

版本1

这样做过于复杂,但同样适用。

您可以执行以下操作:

  1. 使用df.columns获取列名列表
  2. 使用该名称列表创建列列表
  3. 将该列表传递给某些函数,以在折叠式函数式方式中调用列重载的加法函数

借助Python的reduce,一些关于运算符重载的知识以及pyspark代码中的列(此处),您可以编写以下代码:

def column_add(a,b):
     return  a.__add__(b)

newdf = df.withColumn('total_col', 
         reduce(column_add, ( df[col] for col in df.columns ) ))

请注意,这是Python的reduce函数,而不是Spark RDD的reduce函数。在reduce的第二个参数中,括号术语需要加括号,因为它是一个列表生成式。

已测试,可行!

$ pyspark
>>> df = sc.parallelize([{'a': 1, 'b':2, 'c':3}, {'a':8, 'b':5, 'c':6}, {'a':3, 'b':1, 'c':0}]).toDF().cache()
>>> df
DataFrame[a: bigint, b: bigint, c: bigint]
>>> df.columns
['a', 'b', 'c']
>>> def column_add(a,b):
...     return a.__add__(b)
...
>>> df.withColumn('total', reduce(column_add, ( df[col] for col in df.columns ) )).collect()
[Row(a=1, b=2, c=3, total=6), Row(a=8, b=5, c=6, total=19), Row(a=3, b=1, c=0, total=4)]

@Salmonerd 谢谢。有时候记住 Spark dataframe 类是不可变的对于帮助很有帮助,因此要在数据中进行任何更改,都必须调用返回新 dataframe 的某些内容。 - Paul
5
版本2无法与Spark 1.5.0和CDH-5.5.2以及Python版本3.4兼容。它会抛出一个错误:"AttributeError: 'generator' object has no attribute '_get_object_id" - Hemant
1
@Paul 我使用了VERSION 2方法来累加几个“bigint”类型的列。不知何故,我遇到了这个错误:'generator' object has no attribute '_get_object_id'。你知道为什么会发生这种情况吗?谢谢! - Elsa Li
5
版本2无法运行。抛出“TypeError: 'Column' object is not callable”错误。 - Augmented Jacob
3
版本1对我不起作用,出现错误“列不可迭代”。 - Vincent Chalmel
显示剩余11条评论

21
最直接的方法是使用 expr 函数。
from pyspark.sql.functions import *
data = data.withColumn('total', expr("col1 + col2 + col3 + col4"))

这并没有解决需要逐个输入每个列名的问题。 - Jan Jaap Meijerink
@JanJaapMeijerink 列表推导式可以用来构建表达式,下面是一个示例。 - SNicolaou

20
解决方案。
newdf = df.withColumn('total', sum(df[col] for col in df.columns))

由@Paul Works发布。尽管我看到了许多其他人遇到的错误,但我仍然遇到了同样的问题。

TypeError: 'Column' object is not callable

经过一段时间的调查,我发现了问题(至少在我的情况下)。问题是我之前使用了这条代码导入了一些pyspark函数:

from pyspark.sql.functions import udf, col, count, sum, when, avg, mean, min

因此,这一行导入了的sum命令,而df.withColumn('total',sum(df[col] for col in df.columns))应该使用普通的Python sum函数。

您可以使用del sum删除函数的引用。

否则,在我的情况下,我将导入更改为

import pyspark.sql.functions as F

然后将函数引用为F.sum


谢谢@Francesco。我遇到了同样的问题,并像你提到的那样使用“del sum”解决了它。 - Vamshidhar H.K.
1
很高兴我不是唯一犯这个错误的人。 - roschach

13

将列表中的多个列汇总为一列

PySpark的sum函数不支持列相加。可以使用expr函数来实现。

from pyspark.sql.functions import expr

cols_list = ['a', 'b', 'c']

# Creating an addition expression using `join`
expression = '+'.join(cols_list)

df = df.withColumn('sum_cols', expr(expression))

这样就给我们提供了列的所需总和。


1
df = spark.createDataFrame([("linha1", "valor1", 2), ("linha2", "valor2", 5)], ("Columna1", "Columna2", "Columna3"))

df.show()

+--------+--------+--------+
|Columna1|Columna2|Columna3|
+--------+--------+--------+
|  linha1|  valor1|       2|
|  linha2|  valor2|       5|
+--------+--------+--------+

df = df.withColumn('DivisaoPorDois', df[2]/2)
df.show()

+--------+--------+--------+--------------+
|Columna1|Columna2|Columna3|DivisaoPorDois|
+--------+--------+--------+--------------+
|  linha1|  valor1|       2|           1.0|
|  linha2|  valor2|       5|           2.5|
+--------+--------+--------+--------------+

df = df.withColumn('Soma_Colunas', df[2]+df[3])
df.show()

+--------+--------+--------+--------------+------------+
|Columna1|Columna2|Columna3|DivisaoPorDois|Soma_Colunas|
+--------+--------+--------+--------------+------------+
|  linha1|  valor1|       2|           1.0|         3.0|
|  linha2|  valor2|       5|           2.5|         7.5|
+--------+--------+--------+--------------+------------+

1
我的问题与上面类似(稍微复杂一些),因为我需要在PySpark数据框中添加连续的列求和作为新列。这种方法使用了Paul Version 1中的代码:
import pyspark
from pyspark.sql import SparkSession
import pandas as pd

spark = SparkSession.builder.appName('addColAsCumulativeSUM').getOrCreate()
df=spark.createDataFrame(data=[(1,2,3),(4,5,6),(3,2,1)\
                              ,(6,1,-4),(0,2,-2),(6,4,1)\
                              ,(4,5,2),(5,-3,-5),(6,4,-1)]\
                              ,schema=['x1','x2','x3'])
df.show()

+---+---+---+
| x1| x2| x3|
+---+---+---+
|  1|  2|  3|
|  4|  5|  6|
|  3|  2|  1|
|  6|  1| -4|
|  0|  2| -2|
|  6|  4|  1|
|  4|  5|  2|
|  5| -3| -5|
|  6|  4| -1|
+---+---+---+

colnames=df.columns

添加新列,这些列是累积和(连续的):
for i in range(0,len(colnames)):
    colnameLst= colnames[0:i+1]
    colname = 'cm'+ str(i+1)
    df = df.withColumn(colname, sum(df[col] for col in colnameLst))

df.show()

+---+---+---+---+---+---+
| x1| x2| x3|cm1|cm2|cm3|
+---+---+---+---+---+---+
|  1|  2|  3|  1|  3|  6|
|  4|  5|  6|  4|  9| 15|
|  3|  2|  1|  3|  5|  6|
|  6|  1| -4|  6|  7|  3|
|  0|  2| -2|  0|  2|  0|
|  6|  4|  1|  6| 10| 11|
|  4|  5|  2|  4|  9| 11|
|  5| -3| -5|  5|  2| -3|
|  6|  4| -1|  6| 10|  9|
+---+---+---+---+---+---+

"累计总和"列的添加如下所示:
cm1 = x1
cm2 = x1 + x2
cm3 = x1 + x2 + x3

0
一个非常简单的方法是使用select而不是withcolumn,如下所示: df = df.select('*', (col("a")+col("b")+col('c).alias("total")) 根据要求进行微小的更改,就可以得到所需的总和。

0

以下方法适用于我:

  1. 导入pyspark sql函数
    from pyspark.sql import functions as F
  2. 使用F.expr(list_of_columns)
    data_frame.withColumn('Total_Sum',F.expr('col_name1+col_name2+..col_namen)

网页内容由stack overflow 提供, 点击上面的
可以查看英文原文,
原文链接