如何将数据聚合到范围内(分桶)?

3

我有一个表格,如下所示:

+---------------+------+
|id             | value|
+---------------+------+
|               1|118.0|
|               2|109.0|
|               3|113.0|
|               4| 82.0|
|               5| 60.0|
|               6|111.0|
|               7|107.0|
|               8| 84.0|
|               9| 91.0|
|              10|118.0|
+---------------+------+

我想将值聚合或分组到一个范围内,如0,10,20,30,40,...80,90,100,110,120。我该如何在SQL或更具体地说Spark SQL中执行此操作?
目前我使用了一个与范围进行连接的lateral view join,但这似乎相当笨拙/低效。
Quantile discretized并不是我想要的,而是使用此范围的CUT。
编辑 https://github.com/collectivemedia/spark-ext/blob/master/sparkext-mllib/src/main/scala/org/apache/spark/ml/feature/Binning.scala可以执行动态分箱,但我更喜欢使用指定的范围。

org.apache.spark.ml.feature.Bucketizer 接受一个明确提供的分割点数组。然后你就可以在输出列上进行分组。 - Hristo Iliev
1
我认为在这种情况下,所提出的解决方案更简单/可能更有效。 - Georg Heiler
2个回答

15

在一般情况下,可以使用org.apache.spark.ml.feature.Bucketizer来执行静态分箱:

val df = Seq(
  (1, 118.0), (2, 109.0), (3, 113.0), (4, 82.0), (5, 60.0),
  (6, 111.0), (7, 107.0), (8,  84.0), (9, 91.0), (10, 118.0)
).toDF("id", "value")

val splits = (0 to 12).map(_ * 10.0).toArray

import org.apache.spark.ml.feature.Bucketizer
val bucketizer = new Bucketizer()
  .setInputCol("value")
  .setOutputCol("bucket")
  .setSplits(splits)

val bucketed = bucketizer.transform(df)

val solution = bucketed.groupBy($"bucket").agg(count($"id") as "count")

结果:

scala> solution.show
+------+-----+
|bucket|count|
+------+-----+
|   8.0|    2|
|  11.0|    4|
|  10.0|    2|
|   6.0|    1|
|   9.0|    1|
+------+-----+

当值位于定义的区间之外时,Bucketizer会抛出错误。可以将分割点定义为Double.NegativeInfinityDouble.PositiveInfinity以捕获异常值。

Bucketizer的设计是通过对正确的桶进行二进制搜索来高效地处理任意拆分。对于像您这样的常规箱子,可以简单地执行以下操作:

val binned = df.withColumn("bucket", (($"value" - bin_min) / bin_width) cast "int")

其中bin_minbin_width分别是最小箱子的左间隔和箱子宽度。


但是,如果一个桶为空,则分组也不会返回结果。所以,如果我想看到所有桶的列表(包括空桶和数量为0的桶),是否可以在不使用连接的情况下执行此操作? - Georg Heiler
将区间分组后进行连接应该非常有效。 - Hristo Iliev

3

尝试使用 "GROUP BY" 进行操作

SELECT id, (value DIV 10)*10 FROM table_name ;

以下将使用 Scala 的 Dataset API:
df.select(('value divide 10).cast("int")*10)

网页内容由stack overflow 提供, 点击上面的
可以查看英文原文,
原文链接