在Spark中对多个列求和

14
我该如何在Spark中对多列求和?例如,在SparkR中,以下代码可以用于获取一列的总和,但如果我尝试获取df中两列的总和,则会出现错误。
# Create SparkDataFrame
df <- createDataFrame(faithful)

# Use agg to sum total waiting times
head(agg(df, totalWaiting = sum(df$waiting)))
##This works

# Use agg to sum total of waiting and eruptions
head(agg(df, total = sum(df$waiting, df$eruptions)))
##This doesn't work

无论是SparkR还是PySpark代码都可以使用。

6个回答

59

对于 PySpark,如果你不想显式地键入列名:

from operator import add
from functools import reduce
new_df = df.withColumn('total',reduce(add, [F.col(x) for x in numeric_col_list]))

6
为什么这个工具不在Spark API中? - Louis Yang
那是一个有用的技巧,肯定会帮助很多在谷歌这个问题的人,但并不是原来问题所询问的 :)(它询问的是聚合操作,而不是行操作) - Melkor.cz

8

在Pyspark中,您可以执行以下操作:

>>> from pyspark.sql import functions as F
>>> df = spark.createDataFrame([("a",1,10), ("b",2,20), ("c",3,30), ("d",4,40)], ["col1", "col2", "col3"])
>>> df.groupBy("col1").agg(F.sum(df.col2+df.col3)).show()
+----+------------------+
|col1|sum((col2 + col3))|
+----+------------------+
|   d|                44|
|   c|                33|
|   b|                22|
|   a|                11|
+----+------------------+

5
org.apache.spark.sql.functions.sum(Column e)

聚合函数:返回表达式中所有值的总和。

正如您所看到的,sum只需要一个列作为输入,因此sum(df$waiting, df$eruptions)不起作用。由于你想要对数值字段求和,可以执行sum(df("waiting") + df("eruptions"))。如果你想要对每个列的值进行求和,你可以使用 df.agg(sum(df$waiting),sum(df$eruptions)).show


5
对我而言,这个操作起作用了:df.withColumn("newCol", col("col1")+col("col2")) - Ali
@Ali 是的,那也是一个替代方案。 - Balaji Reddy
2
我理解的原始问题是关于聚合:垂直求和列(对于每一列,将所有行相加),而不是行操作:水平求和行(对于每一行,将该行中的列值相加)。 - Melkor.cz

3

SparkR 代码:

library(SparkR)
df <- createDataFrame(sqlContext,faithful)
w<-agg(df,sum(df$waiting)),agg(df,sum(df$eruptions))
head(w[[1]])
head(w[[2]])

3
你可以使用 expr()
import pyspark.sql.functions as f

numeric_cols = ['col_a','col_b','col_c']
df = df.withColumn('total', f.expr('+'.join(cols)))

PySpark的expr()是一个执行类似于SQL表达式的SQL函数。


它应该是df = df.withColumn('total', f.expr('+'.join(numeric_cols)))。 - Lijo Abraham

1

接受的答案对我很有帮助,但我发现下面这个更简单,而且不使用外部API。

sum_df = df.withColumn('total', lit(0))
for c in col_list:
    sum_df = sum_df.withColumn('total', col('total') + col(c))

网页内容由stack overflow 提供, 点击上面的
可以查看英文原文,
原文链接