如何防止谓词下推?

7

最近我在使用Spark和JDBC数据源进行工作。请考虑以下代码片段:

val df = spark.read.(options).format("jdbc").load();
val newDF = df.where(PRED)

PRED 是一系列谓词。

如果 PRED 是一个简单的谓词,比如 x = 10,查询速度会更快。然而,如果存在一些非等条件,比如 date > someOtherDate or date < someOtherDate2,查询速度会比没有谓词下推慢得多。你可能已经知道,数据库引擎对这样的谓词扫描非常慢,在我的情况下甚至慢了 10 倍(!)。

为了防止不必要的谓词下推,我使用了:

val cachedDF = df.cache()
val newDF = cachedDF.where(PRED)

但它需要大量的内存,并且由于在此处提到的问题 - Spark的数据集取消持久化行为 - 我不能取消持久化cachedDF

还有其他选项可以避免下推谓词吗?没有缓存,也没有编写自己的数据源?

注意:即使有关闭谓词下推的选项,它仅适用于其他查询可能仍然使用它的情况。因此,如果我写了:

// some fancy option set to not push down predicates
val df1 = ...
// predicate pushdown works again
val df2 = ...
df1.join(df2)// where df1 without predicate pushdown, but df2 with

2
话虽如此 - 简单的技巧是传递字段 udf[T, T](identity _)。强制转换也可以以类似的方式工作。例如,df.where(($"modifieddate" > "2010-05-28 00:00:00")),其中 modifieddate 是时间戳,不会被推送,而 df.where(($"modifieddate" > lit("2010-05-28 00:00:00").cast("timestamp"))) 将被推送。 - zero323
@user6910411,我在问题示例中添加了“OR”,因为在实际应用中,这个OR+非等式会导致DB2性能下降。 - T. Gawęda
@user6910411 很聪明! ;) 请写一个答案,它值得点赞 :) 或许我也会在Spark开发者邮件列表上发布一些内置的“屏障”相关内容。 - T. Gawęda
此外,如果条件是静态的,您可以将其中一些推入查询中,缓存结果,并对其余条件使用 where(我的意思是“推”https://dev59.com/iZnga4cB1Zd3GeqPZX19)。 - zero323
1
我觉得你没有理解我的重点。缓存的问题是它可能会完全禁用下推。但是,如果将某些谓词推入查询字符串中,Spark 就不会触及这一部分。但当然,这又是另一个 hack。 - zero323
显示剩余6条评论
1个回答

3

网页内容由stack overflow 提供, 点击上面的
可以查看英文原文,
原文链接