Spark动态DAG比硬编码的DAG慢得多且不同。

4

我在Spark中有一个操作,需要对数据框架中的多个列执行。一般来说,有两种可能性来指定这样的操作:

  • 硬编码
handleBias("bar", df)
  .join(handleBias("baz", df), df.columns)
  .drop(columnsToDrop: _*).show
  • 从列名列表动态生成它们
var isFirst = true
var res = df
for (col <- columnsToDrop ++ columnsToCode) {
  if (isFirst) {
    res = handleBias(col, res)
    isFirst = false
  } else {
    res = handleBias(col, res)
  }
}
res.drop(columnsToDrop: _*).show

问题在于动态生成的DAG不同,当使用更多列时,动态解决方案的运行时间增加得更多,而硬编码操作的运行时间则相对较短。
我很好奇如何将动态构建的优雅性与快速执行时间相结合。
以下是示例代码的DAG比较 complexity comparison 对于大约80列,这导致硬编码变体的图形相当漂亮 hardCoded 而动态构建查询的DAG非常庞大,可能不太可并行化且速度较慢。 hugeMessDynamic 当前版本的Spark(2.0.2)使用了DataFrames和spark-sql
完成最小示例的代码:
def handleBias(col: String, df: DataFrame, target: String = "FOO"): DataFrame = {
  val pre1_1 = df
    .filter(df(target) === 1)
    .groupBy(col, target)
    .agg((count("*") / df.filter(df(target) === 1).count).alias("pre_" + col))
    .drop(target)

  val pre2_1 = df
    .groupBy(col)
    .agg(mean(target).alias("pre2_" + col))

  df
    .join(pre1_1, Seq(col), "left")
    .join(pre2_1, Seq(col), "left")
    .na.fill(0)
}

编辑

使用foldleft运行您的任务会生成线性DAG foldleft,为所有列硬编码函数会产生hardcoded

两者都比我的原始DAG好得多,但是对我来说硬编码变体看起来更好。在Spark中连接SQL语句可以允许我动态生成硬编码执行图,但那似乎相当丑陋。您看到其他选项了吗?


我认为问题在于你的“handleBias”函数非常复杂,而且你需要对多列运行它。即使你为许多列硬编码,你的DAG也会很大,所以问题可能不是“动态”应用,而是应用于许多列。因此,如果你能想到一种方法来使你的函数能够同时处理多个列,这可能会有很大帮助。 - Daniel de Paula
@DanieldePaula,你看有没有办法以更简单的方式表达这个方法,以减少计算量? - Georg Heiler
很不幸,我现在没有太多时间去考虑它,非常抱歉。如果明天你还没有找到解决方案,我会看一下的。 - Daniel de Paula
@DanieldePaula 我目前还没有想出简化的方法。不知道缓存是否可以改进?目前,在处理更大的数据集之前,我会使用缓存来调用此函数。 - Georg Heiler
@DanieldePaula,你认为我能否通过“连接”列来减少一些连接?https://dev59.com/aVwY5IYBdhLWcg3wNFdY 因为这里需要进行并行的连接操作(可以执行串联操作)。 - Georg Heiler
如果您使用窗口函数,就可以摆脱连接,就像我在我的回答中展示的那样。您想要的连接概念在 Spark 中没有意义(正如您在发送的链接中所看到的)。 - Daniel de Paula
1个回答

3

编辑 1:从handleBias函数中删除了一个窗口函数,并将其转换为广播连接。

编辑 2:更改了空值替换策略。

我有一些建议可以改进你的代码。首先,对于“handleBias”函数,我建议使用窗口函数和“withColumn”调用来完成,避免使用连接:

import org.apache.spark.sql.DataFrame
import org.apache.spark.sql.functions._
import org.apache.spark.sql.expressions.Window

def handleBias(df: DataFrame, colName: String, target: String = "foo") = {
  val w1 = Window.partitionBy(colName)
  val w2 = Window.partitionBy(colName, target)
  val result = df
    .withColumn("cnt_group", count("*").over(w2))
    .withColumn("pre2_" + colName, mean(target).over(w1))
    .withColumn("pre_" + colName, coalesce(min(col("cnt_group") / col("cnt_foo_eq_1")).over(w1), lit(0D)))
    .drop("cnt_group")
  result
}

然后,如果要对多列进行调用,我建议使用foldLeft,这是解决这种问题的“函数式”方法:

val df = Seq((1, "first", "A"), (1, "second", "A"),(2, "noValidFormat", "B"),(1, "lastAssumingSameDate", "C")).toDF("foo", "bar", "baz")

val columnsToDrop = Seq("baz")
val columnsToCode = Seq("bar", "baz")
val target = "foo"

val targetCounts = df.filter(df(target) === 1).groupBy(target)
  .agg(count(target).as("cnt_foo_eq_1"))
val newDF = df.join(broadcast(targetCounts), Seq(target), "left")

val result = (columnsToDrop ++ columnsToCode).toSet.foldLeft(df) {
  (currentDF, colName) => handleBias(currentDF, colName)
}

result.drop(columnsToDrop:_*).show()

+---+--------------------+------------------+--------+------------------+--------+
|foo|                 bar|           pre_baz|pre2_baz|           pre_bar|pre2_bar|
+---+--------------------+------------------+--------+------------------+--------+
|  2|       noValidFormat|               0.0|     2.0|               0.0|     2.0|
|  1|lastAssumingSameDate|0.3333333333333333|     1.0|0.3333333333333333|     1.0|
|  1|              second|0.6666666666666666|     1.0|0.3333333333333333|     1.0|
|  1|               first|0.6666666666666666|     1.0|0.3333333333333333|     1.0|
+---+--------------------+------------------+--------+------------------+--------+

我不确定它会大幅改善你的DAG,但至少它会使代码更清晰易读。
参考资料:
- Databricks关于Window函数的文章:https://databricks.com/blog/2015/07/15/introducing-window-functions-in-spark-sql.html - 可用函数的API文档:http://spark.apache.org/docs/latest/api/scala/index.html#org.apache.spark.sql.functions$ - foldLeft: https://coderwall.com/p/4l73-a/scala-fold-foldleft-and-foldright

1
非常感谢您提供这个出色的答案。我仍需要在更大的数据上进行测试。请查看编辑/您的代码生成的针对硬编码和foldleft操作的2个不同DAG。为什么它们不是“相同”的呢? - Georg Heiler
@GeorgHeiler它们之间的不同之处在于您硬编码版本使用连接(join),这通常更糟。线性DAG意味着没有连接(join)涉及其中,而且我认为它比另一个版本更好看。在您尝试更多数据后,请告诉我哪个更快。 - Daniel de Paula
.withColumn("pre_" + colName, coalesce(col("cnt_group") / col("cnt_foo_eq_1"), lit(0D))) 不是我想要实现的,我想要替换所有空值为class==1相应的值,而不是用0来代替。例如,对于A&foo=0的所有空值,将使用A&foo=1的0.5作为替换。 - Georg Heiler
@GeorgHeiler,Spark中的并行处理是按行而不是按列进行的,因此恐怕它不会并行处理,因为在这种情况下,Spark每列至少需要一个洗牌操作。 - Daniel de Paula
@GeorgHeiler 关于你遇到的问题,我认为我可能知道原因。在你的“目标”列中,是否有许多不同的值,每个值只有几行,或者只有少数不同的值,每个值都包含很多行? - Daniel de Paula
显示剩余7条评论

网页内容由stack overflow 提供, 点击上面的
可以查看英文原文,
原文链接