Spark DataSet筛选性能

8

我一直在尝试不同的方法来过滤输入的数据集。结果表明,性能可能会有很大差异。

该数据集是基于1.6 GB的33列和4226047行数据创建的。数据集通过加载csv数据并映射到一个case类来创建。

val df = spark.read.csv(csvFile).as[FireIncident]

对 UnitId = 'B02' 的筛选应该返回 47980 行。我测试了以下三种方法: 1)使用类型化列(在本地主机上约为 500 毫秒)

df.where($"UnitID" === "B02").count()

2) 使用临时表和SQL查询(与选项1几乎相同)

df.createOrReplaceTempView("FireIncidentsSF")
spark.sql("SELECT * FROM FireIncidentsSF WHERE UnitID='B02'").count()

3) 使用强类型类字段(14,987ms,即慢30倍)

df.filter(_.UnitID.orNull == "B02").count()

我用Python API再次测试了相同的数据集,时间为17,046毫秒,与Scala API选项3的性能相当。

df.filter(df['UnitID'] == 'B02').count()

能否有人解释一下第三个选项和python API与前两个选项执行方式的不同之处?

2个回答

10

这是由于第三步在这里

在前两步中,Spark不需要反序列化整个Java/Scala对象 - 它只查看一个列并继续执行。

在第三步中,由于您使用了lambda函数,Spark无法确定您只想要一个字段,因此它会为每一行从内存中提取所有33个字段,以便您可以检查一个字段。

我不确定第四步为什么如此缓慢。它似乎应该与第一步的方式相同。


非常有见地的回答。如果你在Java中写了Dataset<Row>,并且加上了这段代码:datasetRdd.filter(r -> r.<String>getAs("event_type_id").equals("LOG")),会发生什么? - Hedrack
@DusanVasiljevic,只要您使用lambda,就可以保留类型,但是您必须对其执行无类型操作以避免加载到内存中。 - Joan
链接已失效,这是一个新的链接。还有一个相关的Jira,最终决定不予实施。 - Joshua Chen

0
当运行Python时,首先将您的代码加载到JVM中进行解释,然后最终编译为字节码。使用Scala API时,Scala本身在JVM上本地运行,因此您可以省略将Python代码加载到JVM中的整个过程。

Python API和Scala API过滤器使用强类型类字段具有可比的性能结果。您知道为什么选项3)比1)或2)慢30倍吗? - YPL

网页内容由stack overflow 提供, 点击上面的
可以查看英文原文,
原文链接