Spark DAG在使用'withColumn'和'select'时有所不同

15

背景

最近在一个Stack Overflow的帖子中,我发现使用withColumn可以改善DAG,特别是处理堆叠/链式列表达式与distinct窗口规范相结合的情况。然而,在这个例子中,withColumn实际上会使DAG更糟,并且与使用select的结果不同。

可重现的示例

首先,一些测试数据(PySpark 2.4.4 standalone):

import pandas as pd
import numpy as np

from pyspark.sql import SparkSession, Window
from pyspark.sql import functions as F

spark = SparkSession.builder.getOrCreate()

dfp = pd.DataFrame(
    {
        "col1": np.random.randint(0, 5, size=100),
        "col2": np.random.randint(0, 5, size=100),
        "col3": np.random.randint(0, 5, size=100),
        "col4": np.random.randint(0, 5, size=100),      
        "col5": np.random.randint(0, 5, size=100),        

    }
)

df = spark.createDataFrame(dfp)
df.show(5)

+----+----+----+----+----+
|col1|col2|col3|col4|col5|
+----+----+----+----+----+
|   0|   3|   2|   2|   2|
|   1|   3|   3|   2|   4|
|   0|   0|   3|   3|   2|
|   3|   0|   1|   4|   4|
|   4|   0|   3|   3|   3|
+----+----+----+----+----+
only showing top 5 rows

这个例子很简单。它包含了两个窗口规范和基于它们的四个独立列表达式:

w1 = Window.partitionBy("col1").orderBy("col2")
w2 = Window.partitionBy("col3").orderBy("col4")

col_w1_1 = F.max("col5").over(w1).alias("col_w1_1")
col_w1_2 = F.sum("col5").over(w1).alias("col_w1_2")
col_w2_1 = F.max("col5").over(w2).alias("col_w2_1")
col_w2_2 = F.sum("col5").over(w2).alias("col_w2_2")

expr = [col_w1_1, col_w1_2, col_w2_1, col_w2_2]

使用withColumn会产生4次Shuffle操作

如果withColumn与交替的窗口规范一起使用,DAG将产生不必要的Shuffle操作:

df.withColumn("col_w1_1", col_w1_1)\
  .withColumn("col_w2_1", col_w2_1)\
  .withColumn("col_w1_2", col_w1_2)\
  .withColumn("col_w2_2", col_w2_2)\
  .explain()

== Physical Plan ==
Window [sum(col5#92L) windowspecdefinition(col3#90L, col4#91L ASC NULLS FIRST, specifiedwindowframe(RangeFrame, unboundedpreceding$(), currentrow$())) AS col_w2_2#147L], [col3#90L], [col4#91L ASC NULLS FIRST]
+- *(4) Sort [col3#90L ASC NULLS FIRST, col4#91L ASC NULLS FIRST], false, 0
   +- Exchange hashpartitioning(col3#90L, 200)
      +- Window [sum(col5#92L) windowspecdefinition(col1#88L, col2#89L ASC NULLS FIRST, specifiedwindowframe(RangeFrame, unboundedpreceding$(), currentrow$())) AS col_w1_2#143L], [col1#88L], [col2#89L ASC NULLS FIRST]
         +- *(3) Sort [col1#88L ASC NULLS FIRST, col2#89L ASC NULLS FIRST], false, 0
            +- Exchange hashpartitioning(col1#88L, 200)
               +- Window [max(col5#92L) windowspecdefinition(col3#90L, col4#91L ASC NULLS FIRST, specifiedwindowframe(RangeFrame, unboundedpreceding$(), currentrow$())) AS col_w2_1#145L], [col3#90L], [col4#91L ASC NULLS FIRST]
                  +- *(2) Sort [col3#90L ASC NULLS FIRST, col4#91L ASC NULLS FIRST], false, 0
                     +- Exchange hashpartitioning(col3#90L, 200)
                        +- Window [max(col5#92L) windowspecdefinition(col1#88L, col2#89L ASC NULLS FIRST, specifiedwindowframe(RangeFrame, unboundedpreceding$(), currentrow$())) AS col_w1_1#141L], [col1#88L], [col2#89L ASC NULLS FIRST]
                           +- *(1) Sort [col1#88L ASC NULLS FIRST, col2#89L ASC NULLS FIRST], false, 0
                              +- Exchange hashpartitioning(col1#88L, 200)
                                 +- Scan ExistingRDD[col1#88L,col2#89L,col3#90L,col4#91L,col5#92L]

选择 - 2 洗牌

如果所有列都使用 select 传递,则 DAG 是正确的。

df.select("*", *expr).explain()

== Physical Plan ==
Window [max(col5#92L) windowspecdefinition(col3#90L, col4#91L ASC NULLS FIRST, specifiedwindowframe(RangeFrame, unboundedpreceding$(), currentrow$())) AS col_w2_1#119L, sum(col5#92L) windowspecdefinition(col3#90L, col4#91L ASC NULLS FIRST, specifiedwindowframe(RangeFrame, unboundedpreceding$(), currentrow$())) AS col_w2_2#121L], [col3#90L], [col4#91L ASC NULLS FIRST]
+- *(2) Sort [col3#90L ASC NULLS FIRST, col4#91L ASC NULLS FIRST], false, 0
   +- Exchange hashpartitioning(col3#90L, 200)
      +- Window [max(col5#92L) windowspecdefinition(col1#88L, col2#89L ASC NULLS FIRST, specifiedwindowframe(RangeFrame, unboundedpreceding$(), currentrow$())) AS col_w1_1#115L, sum(col5#92L) windowspecdefinition(col1#88L, col2#89L ASC NULLS FIRST, specifiedwindowframe(RangeFrame, unboundedpreceding$(), currentrow$())) AS col_w1_2#117L], [col1#88L], [col2#89L ASC NULLS FIRST]
         +- *(1) Sort [col1#88L ASC NULLS FIRST, col2#89L ASC NULLS FIRST], false, 0
            +- Exchange hashpartitioning(col1#88L, 200)
               +- Scan ExistingRDD[col1#88L,col2#89L,col3#90L,col4#91L,col5#92L]

问题

有一些现有信息解释了为什么应该避免使用 withColumn,然而它们主要涉及多次调用 withColumn 并且没有解决 DAG 偏差问题(参见 这里这里)。 有人知道为什么 withColumnselect 之间的DAG不同吗?Spark的优化算法应该适用于任何情况,并且不应受到表达相同操作方式的不同方式的影响。

提前致谢。

3个回答

4
这似乎是由 withColumn 导致的内部投影结果。在Spark文档这里有详细说明。
官方建议遇到多列时,采用Jay建议的方法,改用选择语句(select)。

3

在使用嵌套的withColumns和窗口函数时应注意什么?

假设我想执行以下操作:

w1 = ...rangeBetween(-300, 0)
w2 = ...rowsBetween(-1,0)

(df.withColumn("some1", col(f.max("original1").over(w1))
   .withColumn("some2", lag("some1")).over(w2)).show()

即使使用非常小的数据集,我仍然遇到了许多内存问题和高溢出。如果我使用select而不是withColumn,它执行速度会更快。

df.select(
    f.max(col("original1")).over(w1).alias("some1"),
    f.lag("some1")).over(w2)
).show()

谢谢你的回答!我很感兴趣能够提供一个最小可重现的示例,以便观察到你所描述的内存问题和高溢出。 - pansen

0

对@Victor3y的回答进行扩展:

如果你熟悉SQL而不了解Spark的内部工作原理,那么关于withColumn的文档可能并不完全明显:

这个方法在内部引入了一个投影。因此,多次调用它,例如通过循环来添加多个列,可能会生成庞大的计划,从而导致性能问题...

一个可以形象化这种差异的方式是使用SQL:将withColumn视为将原始DF查询包装在子查询中。

-- df.withColumn("foo", foo).withColumn("bar", bar)
-- 

WITH df AS (...query for df before .withColumn...),
WITH df_with_foo as (select df.*, foo from df)
WITH df_with_bar as (select df_with_foo.*, bar from df_with_foo)
SELECT df_with_bar.* from df_with_bar

所以,在你原本会使用 SQL 子查询和 WITH 子句的情况下,使用 withColumn 是有道理的。但是,如果你只是想添加 N 个独立的列,使用 select 更合适。

网页内容由stack overflow 提供, 点击上面的
可以查看英文原文,
原文链接