Pyspark:在窗口中计算唯一值的数量

43

我刚刚尝试在窗口上执行countDistinct,但出现了以下错误:

分析异常:u'不支持唯一的窗口函数:count(distinct color#1926)

有没有办法在pyspark中对窗口进行唯一计数?

这是一些示例代码:

from pyspark.sql.window import Window    
from pyspark.sql import functions as F

#function to calculate number of seconds from number of days
days = lambda i: i * 86400

df = spark.createDataFrame([(17, "2017-03-10T15:27:18+00:00", "orange"),
                    (13, "2017-03-15T12:27:18+00:00", "red"),
                    (25, "2017-03-18T11:27:18+00:00", "red")],
                    ["dollars", "timestampGMT", "color"])
                    
df = df.withColumn('timestampGMT', df.timestampGMT.cast('timestamp'))

#create window by casting timestamp to long (number of seconds)
w = (Window.orderBy(F.col("timestampGMT").cast('long')).rangeBetween(-days(7), 0))

df = df.withColumn('distinct_color_count_over_the_last_week', F.countDistinct("color").over(w))

df.show()

这是我想要看到的输出:

+-------+--------------------+------+---------------------------------------+
|dollars|        timestampGMT| color|distinct_color_count_over_the_last_week|
+-------+--------------------+------+---------------------------------------+
|     17|2017-03-10 15:27:...|orange|                                      1|
|     13|2017-03-15 12:27:...|   red|                                      2|
|     25|2017-03-18 11:27:...|   red|                                      1|
+-------+--------------------+------+---------------------------------------+
2个回答

75

编辑:如下面noleto的答案中所提到的,自PySpark 2.1以来已经有了approx_count_distinct可用于窗口。


原始回答-精确独特计数(非近似)

我们可以使用sizecollect_set的组合来模拟countDistinct在窗口上的功能:

from pyspark.sql import functions as F, Window

# Function to calculate number of seconds from number of days
days = lambda i: i * 86400

# Create some test data
df = spark.createDataFrame([(17, "2017-03-10T15:27:18+00:00", "orange"),
                    (13, "2017-03-15T12:27:18+00:00", "red"),
                    (25, "2017-03-18T11:27:18+00:00", "red")],
                    ["dollars", "timestampGMT", "color"])
       
# Convert string timestamp to timestamp type             
df = df.withColumn('timestampGMT', df.timestampGMT.cast('timestamp'))

# Create window by casting timestamp to long (number of seconds)
w = Window.orderBy(F.col("timestampGMT").cast('long')).rangeBetween(-days(7), 0)

# Use collect_set and size functions to perform countDistinct over a window
df = df.withColumn('distinct_color_count_over_the_last_week', F.size(F.collect_set("color").over(w)))

df.show()

这将导致上一周记录中颜色的不同计数:

+-------+--------------------+------+---------------------------------------+
|dollars|        timestampGMT| color|distinct_color_count_over_the_last_week|
+-------+--------------------+------+---------------------------------------+
|     17|2017-03-10 15:27:...|orange|                                      1|
|     13|2017-03-15 12:27:...|   red|                                      2|
|     25|2017-03-18 11:27:...|   red|                                      1|
+-------+--------------------+------+---------------------------------------+

如果您的“countDistinct”位于多个列之间怎么办? “collect_set”只能接受单个列名称。 - Jon Deaton
1
这是一个折中的方法,但我做过的一件事就是创建一个新列,它是两个列的串联。比如,如果你有一个名字列和一个姓氏列,添加第三列,将两个列相加。然后你可以使用这个新列来进行collect_set操作。 - Bob Swain
4
有趣。我一直在使用的解决方法是使用 groupBycountDistinct 进行聚合,然后再将结果与原始分组的 DataFrame 进行 join。我想知道对于大型集群来说哪种方法更有效? - Jon Deaton
我认为添加新列会使用更多的RAM,特别是如果你正在处理大量列或列很大的情况下,但它不会增加太多的计算复杂性。 - Bob Swain
当使用orderBy时,我注意到性能问题,它会将所有结果带回驱动程序。 - Salman Ghauri

33

@Bob Swain的回答很好并且有效!从那时起,自Spark 2.1版本开始,Spark提供了一个等价于countDistinct函数的函数approx_count_distinct,使用更高效,最重要的是,支持在窗口中计算不同值的数量。

以下是可替换的代码:

#approx_count_distinct supports a window
df = df.withColumn('distinct_color_count_over_the_last_week', F.approx_count_distinct("color").over(w))

对于基数很小的列,结果应该和"countDistinct"相同。当数据集增长很多时,您应该考虑调整参数rsd——允许的最大估计误差,从而可以调节精度/性能的权衡。


3
结果是否与“countDistinct”相同?有什么保证吗?如果我使用默认的rsd = 0.05,这是否意味着对于基数<20,它将100%的时间返回正确的结果? - Kombajn zbożowy

网页内容由stack overflow 提供, 点击上面的
可以查看英文原文,
原文链接