使用Scala在Spark DataFrame中基于现有列的聚合添加新列

5

我有一个类似下面的DataFrame。我需要基于现有列创建一个新列。

col1 col2
a      1
a      2
b      1
c      1
d      1
d      2

输出的数据框应该长成这样

col1  col2 col3 col4
a      1   1      2
a      2   1      2
b      1   0      1
c      1   0      1
d      1   1      2
d      2   1      2

我用来找到 col3 的逻辑是如果 col1 的计数>1并且col4 是 col2 的最大值
我熟悉如何在 SQL 中实现,但使用 DataFrame DSL 很难找到解决方案。任何帮助将不胜感激。谢谢。
4个回答

6

1
+1 是为了加入和分组的概念。仅作澄清,col3 不是 col2 的总和,而是 col2 的计数。如果 col2 > 1,则应为 1,否则应为零。没有使用 join 的方法吗?当我在大量数据中使用 join 时,会遇到内存错误。谢谢。 - John
是的,我也想知道没有使用连接的解决方案。 - Shrikant Prabhu

2

Spark数据框具有一个叫做withColumn的属性,您可以添加任意多个派生列。但是该列不会被添加到现有的数据框中,而是创建一个新的带有添加列的数据框。

例如,向数据中添加静态日期:

val myFormattedData = myData.withColumn("batchdate",addBatchDate(myData("batchdate")))
val addBatchDate = udf { (BatchDate: String) => "20160101" }

2
要添加 col3,您可以使用 withcolumn + when/otherwise:
val df2 = df.withColumn("col3",when($"col2" > 1, 1).otherwise(0))

如果要添加col4,则可以使用已经提到的groupBy / max + join:

val df3 = df2.join(df.groupBy("col1").max("col2"), "col1")

2
为了不使用连接实现此目的,您需要使用countmax作为窗口函数。这需要使用Window创建一个窗口,并告诉countmax在此窗口上进行操作。
from pyspark.sql import Window, functions as fn

df = sc.parallelize([
    {'col1': 'a', 'col2': 1},
    {'col1': 'a', 'col2': 2},
    {'col1': 'b', 'col2': 1},
    {'col1': 'c', 'col2': 1},
    {'col1': 'd', 'col2': 1},
    {'col1': 'd', 'col2': 2}
]).toDF()

col1_window = Window.partitionBy('col1')
df = df.withColumn('col3', fn.when(fn.count('col1').over(col1_window) > 1, 1).otherwise(0))
df = df.withColumn('col4', fn.max('col2').over(col1_window))
df.orderBy(['col1', 'col2']).show()

网页内容由stack overflow 提供, 点击上面的
可以查看英文原文,
原文链接