Spark DataFrame:统计每列的唯一值数量

44

问题其实已经在标题中了:是否有一种高效的方法来计算数据框中每一列不同值的数量?

describe 方法只提供了计数,但没有提供不同计数,我想知道是否有一种方法可以获取所有(或某些选择的)列的不同计数。

6个回答

76

pySpark中,您可以像这样使用countDistinct()

from pyspark.sql.functions import col, countDistinct

df.agg(*(countDistinct(col(c)).alias(c) for c in df.columns))

同样在Scala中:

import org.apache.spark.sql.functions.countDistinct
import org.apache.spark.sql.functions.col

df.select(df.columns.map(c => countDistinct(col(c)).alias(c)): _*)

如果你想加快速度,但可能会损失一些准确性,你也可以使用approxCountDistinct()


请问您能解释一下在您的Pyspark解决方案中这里的 * 是什么意思吗? - Peybae
4
Python 中的星号操作符可以用于将迭代器中的参数解包并传递给函数调用,详见这里 - mtoto

42

多重聚合计算成本相当高。我建议您使用近似方法代替。在这种情况下,近似不同的计数:

val df = Seq((1,3,4),(1,2,3),(2,3,4),(2,3,5)).toDF("col1","col2","col3")

val exprs = df.columns.map((_ -> "approx_count_distinct")).toMap
df.agg(exprs).show()
// +---------------------------+---------------------------+---------------------------+
// |approx_count_distinct(col1)|approx_count_distinct(col2)|approx_count_distinct(col3)|
// +---------------------------+---------------------------+---------------------------+
// |                          2|                          2|                          3|
// +---------------------------+---------------------------+---------------------------+
approx_count_distinct 方法依赖于 HyperLogLog 算法。在 Spark 中实现的 HyperLogLog++ 算法,基于以下聪明的观察:如果数字均匀地分布在一个范围内,则可以通过二进制表示中前导零的最大数量来近似计算不同元素的数量。例如,如果我们观察到一个数字,其二进制形式的数字为 0…(重复 k 次)…01…1,那么我们可以估计集合中的元素数量约为 2^k。这是一个非常粗略的估计,但可以通过草图算法进行精细优化。有关此算法背后机制的详细说明,请参阅 原始论文

注意:Spark 1.6 开始,当 Spark 调用SELECT SOME_AGG(DISTINCT foo)), SOME_AGG(DISTINCT bar)) FROM df 时,每个子句都应该触发单独的聚合。这与SELECT SOME_AGG(foo), SOME_AGG(bar) FROM df 不同,后者只进行一次聚合。因此,在使用 count(distinct(_))approxCountDistinct(或 approx_count_distinct)时,性能不可比较。

这是自 Spark 1.6 以来的一项行为变更之一:

有了对具有不同聚合的查询的改进的查询计划程序(SPARK-9241),拥有单个去重聚合的查询的计划已更改为更健壮的版本。要切换回由Spark 1.5的计划程序生成的计划,请将 spark.sql.specializeSingleDistinctAggPlanning 设置为 true。(SPARK-12077)

参考资料:Apache Spark中的近似算法:HyperLogLog与Quantiles

只是一个警告:请注意,对于几乎每个值都是唯一的列,在默认配置下,approx_count_distinct 可能会出现高达10%的误差,并且实际上可能需要与 count_distinct 相同的时间。它甚至可能返回比实际行数更高的值。 - Thomas
这是正确的,但你的数据集越大,误差就越低。 - eliasah
你在PySpark中怎么做这个? - Ross Brigoli

22

如果您只想计算特定列,则以下方法可能有所帮助。虽然回答有点晚,但它可能会对某些人有所帮助。(已测试pyspark 2.2.0

from pyspark.sql.functions import col, countDistinct
df.agg(countDistinct(col("colName")).alias("count")).show()

3
如果您想将答案保存到一个变量中而不是显示给用户,可以用 .collect()[0][0] 替换 .show() - Sarah Messer

9

在desaiankitb的回答基础上,以下回答可能更为直观:

from pyspark.sql.functions import count

df.groupBy(colname).count().show()

对于那些希望显示所选列中每个唯一值出现次数的人来说,这是一个很好的答案。 - Andreas L.
要获取唯一值的数量,只需使用 len(df.groupBy(colname).count().show()) 即可。 - haneulkim

1
你可以使用 SQL 的 count(列名) 函数。
另外,如果你正在进行数据分析,并且想要对每个列进行粗略估计而不是准确计数,可以使用 approx_count_distinct(expr[, relativeSD]) 函数。

0

这是一种创建包含每列计数的数据框的方法:

> df = df.to_pandas_on_spark()
>         collect_df = []
>         for i in df.columns:
>             collect_df.append({"field_name": i , "unique_count": df[i].nunique()})
>         uniquedf = spark.createDataFrame(collect_df)

输出如下。我将其与另一个数据帧一起使用,以比较列名相同的值。另一个数据帧也是这样创建的,然后连接在一起。

df_prod_merged = uniquedf1.join(uniquedf2, on='field_name', how="left")

这是一种简单的方法,但处理像1TB这样庞大的数据可能会很昂贵,但在使用to_pandas_on_spark()时仍然非常高效。

enter image description here


网页内容由stack overflow 提供, 点击上面的
可以查看英文原文,
原文链接