在pyspark中groupBy后如何计算唯一ID数量

66
我使用以下代码对学生进行年度聚合。目的是了解每年的学生总数。
from pyspark.sql.functions import col
import pyspark.sql.functions as fn
gr = Df2.groupby(['Year'])
df_grouped = 
gr.agg(fn.count(col('Student_ID')).alias('total_student_by_year'))

我发现的问题是很多ID重复,导致结果错误且巨大。

我想按年份汇总学生,统计每年的学生总数,并避免ID重复。


我从Hive表中调用了数据。 - Lizou
5个回答

135

使用countDistinct函数

from pyspark.sql.functions import countDistinct
x = [("2001","id1"),("2002","id1"),("2002","id1"),("2001","id1"),("2001","id2"),("2001","id2"),("2002","id2")]
y = spark.createDataFrame(x,["year","id"])

gr = y.groupBy("year").agg(countDistinct("id"))
gr.show()

输出

+----+------------------+
|year|count(DISTINCT id)|
+----+------------------+
|2002|                 2|
|2001|                 2|
+----+------------------+

15
为了完整起见,您还可以使用.alias()来重命名列。 - niczky12
1
请注意,countDistinct 不会将 Null 视为不同的值! - Thomas
基于哪个版本? - Noppu

5
你还可以这样做: gr.groupBy("year", "id").count().groupBy("year").count() 这个查询将返回每年独特的学生数。

1

如果你正在使用较旧版本的Spark并且没有countDistinct函数,你可以使用sizecollect_set函数的组合来复制它,像这样:

gr = gr.groupBy("year").agg(fn.size(fn.collect_set("id")).alias("distinct_count"))

如果您需要对多列进行不同计数,只需使用concat将这些列连接成一个新列,然后执行与上述相同的操作。

1

countDistinct()和多个聚合函数在流处理中都不被支持。


0

通过使用Spark/PySpark SQL

y.createOrReplaceTempView("STUDENT")
    
spark.sql("SELECT year, count(DISTINCT id) as count" + \
"FROM STUDENT group by year").show()

网页内容由stack overflow 提供, 点击上面的
可以查看英文原文,
原文链接