为什么在类型化的 Dataset API 中不使用谓词下推(predicate pushdown)(相对于非类型化的 DataFrame API)?

15

我一直以为数据集/数据框架API是相同的,唯一的区别是数据集API会在编译时提供安全性。对吗?

所以,我有一个非常简单的案例:

 case class Player (playerID: String, birthYear: Int)

 val playersDs: Dataset[Player] = session.read
  .option("header", "true")
  .option("delimiter", ",")
  .option("inferSchema", "true")
  .csv(PeopleCsv)
  .as[Player]

 // Let's try to find players born in 1999. 
 // This will work, you have compile time safety... but it will not use predicate pushdown!!!
 playersDs.filter(_.birthYear == 1999).explain()

 // This will work as expected and use predicate pushdown!!!
 // But you can't have compile time safety with this :(
 playersDs.filter('birthYear === 1999).explain()

从第一个示例可以看出,它没有进行谓词下推(注意空的PushedFilters):

== Physical Plan ==
*(1) Filter <function1>.apply
+- *(1) FileScan csv [...] Batched: false, Format: CSV, Location: InMemoryFileIndex[file:People.csv], PartitionFilters: [], PushedFilters: [], ReadSchema: struct<playerID:string,birthYear:int,birthMonth:int,birthDay:int,birthCountry:string,birthState:s...

第二个样本将正确执行(请注意PushedFilters):
== Physical Plan ==
*(1) Project [.....]
+- *(1) Filter (isnotnull(birthYear#11) && (birthYear#11 = 1999))
   +- *(1) FileScan csv [...] Batched: false, Format: CSV, Location: InMemoryFileIndex[file:People.csv], PartitionFilters: [], PushedFilters: [IsNotNull(birthYear), EqualTo(birthYear,1999)], ReadSchema: struct<playerID:string,birthYear:int,birthMonth:int,birthDay:int,birthCountry:string,birthState:s...

所以问题是...我如何使用DS Api,并在编译时确保安全,同时让谓词下推按预期工作?是否可能?如果不是...这是否意味着DS api为您提供了编译时安全性...但代价是性能!?(在处理大型parquet文件时,DF将更快)

可能是Spark 2.0 Dataset vs DataFrame的重复问题。 - Alper t. Turker
1个回答

24

这是你的物理计划中应该记住的一行,以了解Dataset[T]DataFrame(即Dataset[Row])之间的真正区别。

Filter <function1>.apply

我一直建议人们远离类型化的Dataset API,继续使用未经类型化的DataFrame API,因为在太多地方,Scala代码会成为优化器的黑匣子。你刚刚碰到了其中之一,也要考虑Spark SQL将所有对象反序列化并远离JVM以避免GC的情况。每次触及对象时,你实际上要求Spark SQL对对象进行反序列化并将它们加载到JVM上,这会对GC造成很大压力(与未经类型化的DataFrame API相比,使用类型化的Dataset API会更频繁地触发GC)。
参见UDFs are Blackbox — Don’t Use Them Unless You’ve Got No Choice

引用Reynold Xin在dev@spark.a.o邮件列表上问同样的问题后所说的话:

UDF是一个黑匣子,因此Spark无法知道它正在处理什么。有一些简单的情况,我们可以分析UDF的字节码并推断出它在做什么,但通常情况下这很难做到。

有一个JIRA票据来处理这种情况 SPARK-14083 分析JVM字节码并将闭包转换为Catalyst表达式,但正如某人所说(我想这是Adam B.在Twitter上说的),期望它很快出现是一种玩笑。

Dataset API的一个重要优点是类型安全,代价是性能较差,因为它严重依赖于用户定义的闭包/lambda函数。这些闭包通常比表达式慢,因为我们有更多的灵活性来优化表达式(已知数据类型,没有虚函数调用等)。在许多情况下,实际上并不难查看这些闭包的字节码并弄清楚它们试图做什么。如果我们能够理解它们,那么我们就可以直接将它们转化为Catalyst表达式以进行更优化的执行。


// Let's try to find players born in 1999. 
// This will work, you have compile time safety... but it will not use predicate pushdown!!!
playersDs.filter(_.birthYear == 1999).explain()

上面的代码等同于以下内容:

val someCodeSparkSQLCannotDoMuchOutOfIt = (p: Player) => p.birthYear == 1999
playersDs.filter(someCodeSparkSQLCannotDoMuchOutOfIt).explain()

someCodeSparkSQLCannotDoMuchOutOfIt 这正是你放弃优化并让Spark Optimizer跳过它的地方。


我还没有使用过它,但也许frameless会有所帮助。手册中有一部分是关于这个特定问题的,可以参考这里 - pls
刚才,我也遇到了谓词下推不起作用的问题,使用 Typed Dataset API。感谢解释。所以你是说使用 Typed Dataset API 对于 SparkSQL 引擎来说就是个黑盒,这是否意味着它们中没有任何优化? - thedevd
1
不是“没有优化”,而只是一些。 - Jacek Laskowski

网页内容由stack overflow 提供, 点击上面的
可以查看英文原文,
原文链接