Spark谓词下推性能

5

我有一些以日期为分区存储的Parquet文件,存储在以下目录中:

/activity
    /date=20180802

我使用的是Spark 2.2版本,有400多个分区。我的理解是谓词下推应该可以让我运行像下面这样的查询,并快速获得结果。

spark.read.parquet(".../activity")
    .filter($"date" === "20180802" && $"id" === "58ff800af2")
    .show()

然而,上面的查询花费了大约90秒的时间,而下面的查询仅需要大约5秒钟的时间。我是做错了什么还是这是正常行为?

spark.read.parquet(".../activity/date=20180802")
    .filter($"id" === "58ff800af2")
    .show()

1
你尝试过连续运行查询吗?第一次运行会慢很多,因为Spark需要扫描所有分区。此外,值得调用.explain()查看计划并验证筛选器是否确实按预期下推。 - Denis Makarenko
我最初误解了这个问题,太匆忙了。我在想谓词下推是否实际适用于这里。这不是SPARK SQL,而是spark.read非JDBC。 - thebluephantom
1
这是一个有趣的问题,SO上有一些有趣的陈述。我认为Denis已经给出了答案,因为你没有使用SQL语句而是使用了spark.read。话虽如此,人们会期望更智能的性能,但是... - thebluephantom
1
请注意给出的示例和Denis的答案,我认为这解释了它。Spark是否支持使用Parquet文件进行分区修剪? - thebluephantom
1
我认为我们在这里也谈到了分区修剪。 - thebluephantom
感谢@DenisMakarenko和thebluephantom的帮助。我运行了explain来验证筛选器是否被下推并且多次运行查询确实有所帮助。与将其包含在路径中相比,仍然在日期上进行过滤仍需要大约10秒钟,但这更符合我的预期。 - Duke Silver
2个回答

5
我也注意到了这一点,并在Spark峰会演示中谈论了它
Spark执行了一个昂贵的文件列表操作,这可能会使事情变慢。Spark非常擅长列出文件。我已经将Spark文件列表时间与AWS CLI进行了比较,不知道为什么Spark需要那么长时间来列出文件。
你应该重新表述“My understanding is that predicate pushdown...”为“我的理解是分区过滤器……”。谓词下推过滤是不同的。
这也是Delta数据湖的问题。使用Delta数据湖时情况更糟,因为您提到的避免文件列表的解决方法在Delta中无法使用。
简而言之,您没有做错任何事情,这是预期的行为。您只有400个分区,因此在您的情况下,不必要的文件列表并不那么糟糕。当您有20,000个分区时,请想象一下速度会变得多么缓慢!

0
尝试这个,看看谓词下推和分区剪枝是否起作用:

试一试,看看谓词下推和分区剪枝是否起作用:

val df = spark.read.parquet(".../activity")
df.filter($"date" === "20180802" && $"id" === "58ff800af2").explain(true)

在生成的物理计划中查找PushedFilter [...]和PartitionFilters [...]。这将告诉您第一部分不起作用的原因。但是我不确定如何解决它,因为我也遇到了类似而奇怪的问题,但尚未解决。

网页内容由stack overflow 提供, 点击上面的
可以查看英文原文,
原文链接