这并不明显。在Spark数据帧API中,我没有看到定义列的基于行求和的方式。
版本2
这可以通过一个相当简单的方式完成:
newdf = df.withColumn('total', sum(df[col] for col in df.columns))
df.columns
是由pyspark提供的一个字符串列表,其中包含Spark数据框中所有列的列名。要进行其他求和操作,您可以提供任何其他列名的列表。
我没有尝试这个作为我的第一个解决方案,因为我不确定它会表现如何。但是它能够工作。
版本1
这样做过于复杂,但同样适用。
您可以执行以下操作:
- 使用
df.columns
获取列名列表
- 使用该名称列表创建列列表
- 将该列表传递给某些函数,以在折叠式函数式方式中调用列重载的加法函数
借助Python的reduce,一些关于运算符重载的知识以及pyspark代码中的列(此处),您可以编写以下代码:
def column_add(a,b):
return a.__add__(b)
newdf = df.withColumn('total_col',
reduce(column_add, ( df[col] for col in df.columns ) ))
请注意,这是Python的reduce函数,而不是Spark RDD的reduce函数。在reduce的第二个参数中,括号术语需要加括号,因为它是一个列表生成式。
已测试,可行!
$ pyspark
>>> df = sc.parallelize([{'a': 1, 'b':2, 'c':3}, {'a':8, 'b':5, 'c':6}, {'a':3, 'b':1, 'c':0}]).toDF().cache()
>>> df
DataFrame[a: bigint, b: bigint, c: bigint]
>>> df.columns
['a', 'b', 'c']
>>> def column_add(a,b):
... return a.__add__(b)
...
>>> df.withColumn('total', reduce(column_add, ( df[col] for col in df.columns ) )).collect()
[Row(a=1, b=2, c=3, total=6), Row(a=8, b=5, c=6, total=19), Row(a=3, b=1, c=0, total=4)]
RDD.map(lambda data: (data, sum(data)))
。使用Spark数据框更困难的主要原因是弄清楚在withColumn
中允许作为列表达式的内容。它似乎没有很好的文档记录。 - Paul`dftest.withColumn("times", sum((dftest[c] > 2).cast("int") for c in dftest.columns[1:]))` 然后, `dftest.select('a', 'b', 'c', 'd').rdd.map(lambda x: (x, sum(x))).take(2)`
似乎无法正常工作。 - Abhinav Sood