Spark窗口函数与分组聚合的性能问题

16

我有一个类似这样的数据框

| id | date      |  KPI_1 | ... | KPI_n
| 1  |2012-12-12 |   0.1  | ... |  0.5
| 2  |2012-12-12 |   0.2  | ... |  0.4
| 3  |2012-12-12 |   0.66 | ... |  0.66 
| 1  |2012-12-13 |   0.2  | ... |  0.46
| 4  |2012-12-14 |   0.2  | ... |  0.45 
| ...
| 55| 2013-03-15 |  0.5  | ... |  0.55

我们有:

  • X个标识符
  • 给定日期的每个标识符都有一行数据
  • n个KPI(关键绩效指标)

我需要为每行数据计算一些衍生KPI,而这些KPI依赖于每个ID之前的值。假设我的衍生KPI是差异,那么它会是:

| id | date      |  KPI_1 | ... | KPI_n | KPI_1_diff | KPI_n_diff
| 1  |2012-12-12 |   0.1  | ... |  0.5  |   0.1      | 0.5
| 2  |2012-12-12 |   0.2  | ... |  0.4  |   0.2      |0.4
| 3  |2012-12-12 |   0.66 | ... |  0.66 |   0.66     | 0.66 
| 1  |2012-12-13 |   0.2  | ... |  0.46 |   0.2-0.1  | 0.46 - 0.66
| 4  |2012-12-13 |   0.2  | ... |  0.45  ...
| ...
| 55| 2013-03-15 |  0.5  | ... |  0.55

现在:我会做的是:

val groupedDF = myDF.groupBy("id").agg(
    collect_list(struct(col("date",col("KPI_1"))).as("wrapped_KPI_1"),
    collect_list(struct(col("date",col("KPI_2"))).as("wrapped_KPI_2")
    // up until nth KPI
)

我将获得聚合的数据,例如:

[("2012-12-12",0.1),("2012-12-12",0.2) ...

然后我将对这些包装数据进行排序,解包并使用一些UDF在聚合结果上进行映射,并产生输出(计算差异和其他统计信息)。

另一种方法是使用窗口函数,例如:

val window = Window.partitionBy(col("id")).orderBy(col("date")).rowsBetween(Window.unboundedPreceding,0L) 

并执行:

val windowedDF = df.select (
  col("id"),
  col("date"),
  col("KPI_1"),
  collect_list(struct(col("date"),col("KPI_1"))).over(window),  
  collect_list(struct(col("date"),col("KPI_2"))).over(window)
   )   

这样我就得到:

[("2012-12-12",0.1)]
[("2012-12-12",0.1), ("2012-12-13",0.1)]
...

这种处理方式看起来更加优美,但我怀疑重复窗口会为每个KPI产生不必要的分组和排序。

所以问题如下:

  1. 我是否应该采用分组方式?
  2. 我是否应该使用窗口方式?如果是,最有效的方法是什么?

2
你应该采用窗口方法,但在选择之前,根据id重新分区数据框以帮助减少洗牌操作。val windowedDF = df.repartition(col("id")).select(...) - Apurba Pandey
每次评估“窗口”时,我最终可能会导致“重新排序”? 因此,提前重新分区将生成已分区和排序的数据,因此“窗口”将成为“无操作”? 有一种方法可以“重用”同一窗口以发出更复杂的结构化数据吗? - JayZee
1
是的。我之前在我的代码中尝试过这个方法来进行多数投票。它只对数据框进行一次洗牌,并将其用于所有窗口函数。 - Apurba Pandey
好的,非常感谢。我认为你应该“回答”这个问题,这样它就更容易被看到,我也可以点赞。 - JayZee
2个回答

20

我认为窗口函数的方法应该是更好的解决方案,但在使用窗口函数之前,您应该根据id重新分区数据框。这将只对数据进行一次洗牌,并且所有窗口函数都应该在已经洗牌的数据框上执行。希望这有所帮助。

代码应该像这样:

val windowedDF = df.repartition(col("id"))
  .select (
  col("id"),
  col("date"),
  col("KPI_1"),
  col("KPI_2"),
  collect_list(struct(col("date"),col("KPI_1"))).over(window),
  collect_list(struct(col("date"),col("KPI_2"))).over(window)
)

@Raphael Roth

在这里,我们正在对单个窗口进行聚合。这就是为什么你可能会看到相同的执行计划。请参见下面的示例,其中可以从一个分区中对多个窗口进行聚合。

val list = Seq(( "2", null, 1, 11, 1, 1 ),
  ( "2", null, 1, 22, 2, 2 ),
  ( "2", null, 1, 11, 1, 3 ),
  ( "2", null, 1, 22, 2, 1 ),
  ( "2", null, 1, 33, 1, 2 ),
  ( null, "3", 3, 33, 1, 2 ),
  ( null, "3", 3, 33, 2, 3 ),
  ( null, "3", 3, 11, 1, 1 ),
  ( null, "3", 3, 22, 2, 2 ),
  ( null, "3", 3, 11, 1, 3 )
)

val df = spark.sparkContext.parallelize(list).toDF("c1","c2","batchDate","id", "pv" , "vv")

val c1Window = Window.partitionBy("batchDate", "c1")
val c2Window = Window.partitionBy("batchDate", "c2")

val agg1df = df.withColumn("c1List",collect_list("pv").over(c1Window))
  .withColumn("c2List", collect_list("pv").over(c2Window))

val agg2df = df.repartition($"batchDate")
  .withColumn("c1List",collect_list("pv").over(c1Window))
  .withColumn("c2List", collect_list("pv").over(c2Window))


agg1df.explain()
== Physical Plan ==
Window [collect_list(pv#18, 0, 0) windowspecdefinition(batchDate#16, c2#15, ROWS BETWEEN UNBOUNDED PRECEDING AND UNBOUNDED FOLLOWING) AS c2List#38], [batchDate#16, c2#15]
+- *Sort [batchDate#16 ASC NULLS FIRST, c2#15 ASC NULLS FIRST], false, 0
   +- Exchange hashpartitioning(batchDate#16, c2#15, 1)
      +- Window [collect_list(pv#18, 0, 0) windowspecdefinition(batchDate#16, c1#14, ROWS BETWEEN UNBOUNDED PRECEDING AND UNBOUNDED FOLLOWING) AS c1List#28], [batchDate#16, c1#14]
         +- *Sort [batchDate#16 ASC NULLS FIRST, c1#14 ASC NULLS FIRST], false, 0
            +- Exchange hashpartitioning(batchDate#16, c1#14, 1)
               +- *Project [_1#7 AS c1#14, _2#8 AS c2#15, _3#9 AS batchDate#16, _4#10 AS id#17, _5#11 AS pv#18, _6#12 AS vv#19]
                  +- *SerializeFromObject [staticinvoke(class org.apache.spark.unsafe.types.UTF8String, StringType, fromString, assertnotnull(input[0, scala.Tuple6, true])._1, true) AS _1#7, staticinvoke(class org.apache.spark.unsafe.types.UTF8String, StringType, fromString, assertnotnull(input[0, scala.Tuple6, true])._2, true) AS _2#8, assertnotnull(input[0, scala.Tuple6, true])._3 AS _3#9, assertnotnull(input[0, scala.Tuple6, true])._4 AS _4#10, assertnotnull(input[0, scala.Tuple6, true])._5 AS _5#11, assertnotnull(input[0, scala.Tuple6, true])._6 AS _6#12]
                     +- Scan ExternalRDDScan[obj#6]

agg2df.explain()
== Physical Plan ==
Window [collect_list(pv#18, 0, 0) windowspecdefinition(batchDate#16, c2#15, ROWS BETWEEN UNBOUNDED PRECEDING AND UNBOUNDED FOLLOWING) AS c2List#60], [batchDate#16, c2#15]
+- *Sort [batchDate#16 ASC NULLS FIRST, c2#15 ASC NULLS FIRST], false, 0
   +- Window [collect_list(pv#18, 0, 0) windowspecdefinition(batchDate#16, c1#14, ROWS BETWEEN UNBOUNDED PRECEDING AND UNBOUNDED FOLLOWING) AS c1List#50], [batchDate#16, c1#14]
      +- *Sort [batchDate#16 ASC NULLS FIRST, c1#14 ASC NULLS FIRST], false, 0
         +- Exchange hashpartitioning(batchDate#16, 1)
            +- *Project [_1#7 AS c1#14, _2#8 AS c2#15, _3#9 AS batchDate#16, _4#10 AS id#17, _5#11 AS pv#18, _6#12 AS vv#19]
               +- *SerializeFromObject [staticinvoke(class org.apache.spark.unsafe.types.UTF8String, StringType, fromString, assertnotnull(input[0, scala.Tuple6, true])._1, true) AS _1#7, staticinvoke(class org.apache.spark.unsafe.types.UTF8String, StringType, fromString, assertnotnull(input[0, scala.Tuple6, true])._2, true) AS _2#8, assertnotnull(input[0, scala.Tuple6, true])._3 AS _3#9, assertnotnull(input[0, scala.Tuple6, true])._4 AS _4#10, assertnotnull(input[0, scala.Tuple6, true])._5 AS _5#11, assertnotnull(input[0, scala.Tuple6, true])._6 AS _6#12]
                  +- Scan ExternalRDDScan[obj#6]

kpi1DF.repartition(col("short_name")).sortWithinPartitions(col("reading_date")) - JayZee
1
在这里,我们将其聚合为收集列表,因此顺序对于if语句并不重要。对于进一步的计算,如果需要列表中的顺序,则可能会有所帮助。 - Apurba Pandey
1
你能否展示带有和不带有 repartition 的 Spark 计划来证明你的回答?我自己进行了一些测试:在同一个窗口上使用2个窗口函数不会导致2个shuffle,只有1个。加入 repartition并没有改变计划... - Raphael Roth
我已经修改了答案并添加了计划,请查看。 - Apurba Pandey

2

假设你需要根据给定组的计数提取记录。

我们可以用两种方法来实现。

方法 1:

val dedup = inputDF.select(col("A"),col("B"),col("C"),col("D")).groupBy("A","B","C","D").count().as("count")

val snapDF = inputDF.as("baseLoad").join(dedup.as("dedup"),inputDF("A") === dedup("A") && inputDF("B") === dedup("B") && inputDF("C") === dedup("C") && inputDF("D") === dedup("D"),"leftouter").select("baseLoad.*","dedup.count").filter(col("count") > 2)

方法二:
val win = Window.partitionBy("A","B","C","D")
val win_count= inputDF.withColumn("row_count",count("*").over(win)).filter(col("count") > 2)

在上述两种方法中,

  • 方法1:25分钟
  • 方法2:4分钟

所以,如果你有任何可能使用窗口函数而不是分组的方法,请始终选择窗口函数。


网页内容由stack overflow 提供, 点击上面的
可以查看英文原文,
原文链接