我在Spark中有一个操作,需要对数据框架中的多个列执行。一般来说,有两种可能性来指定这样的操作:
- 硬编码
handleBias("bar", df)
.join(handleBias("baz", df), df.columns)
.drop(columnsToDrop: _*).show
- 从列名列表动态生成它们
var isFirst = true
var res = df
for (col <- columnsToDrop ++ columnsToCode) {
if (isFirst) {
res = handleBias(col, res)
isFirst = false
} else {
res = handleBias(col, res)
}
}
res.drop(columnsToDrop: _*).show
问题在于动态生成的DAG不同,当使用更多列时,动态解决方案的运行时间增加得更多,而硬编码操作的运行时间则相对较短。
我很好奇如何将动态构建的优雅性与快速执行时间相结合。
以下是示例代码的DAG比较
![complexity comparison](https://istack.dev59.com/C63sR.webp)
![hardCoded](https://istack.dev59.com/dUwQK.webp)
![hugeMessDynamic](https://istack.dev59.com/a7x9P.webp)
完成最小示例的代码:
def handleBias(col: String, df: DataFrame, target: String = "FOO"): DataFrame = {
val pre1_1 = df
.filter(df(target) === 1)
.groupBy(col, target)
.agg((count("*") / df.filter(df(target) === 1).count).alias("pre_" + col))
.drop(target)
val pre2_1 = df
.groupBy(col)
.agg(mean(target).alias("pre2_" + col))
df
.join(pre1_1, Seq(col), "left")
.join(pre2_1, Seq(col), "left")
.na.fill(0)
}
编辑
使用foldleft
运行您的任务会生成线性DAG ,为所有列硬编码函数会产生
。
两者都比我的原始DAG好得多,但是对我来说硬编码变体看起来更好。在Spark中连接SQL语句可以允许我动态生成硬编码执行图,但那似乎相当丑陋。您看到其他选项了吗?