使用日期范围对分区数据进行Spark SQL查询

13

我的数据集是这样分区的:

Year=yyyy
 |---Month=mm
 |   |---Day=dd
 |   |   |---<parquet-files>

在Spark中创建一个包含两个日期之间数据的DataFrame,最简单和高效的方法是什么?


如果您想在分区上进行轻松的范围查询,最好的解决方案是使用更好的分区策略,其中单个轴上的时间为例,例如 /tbl/ts=yyyymmddhhmm/*.parquet。关于这个主题有一个章节在 https://spark-summit.org/east-2017/events/bulletproof-jobs-patterns-for-large-scale-spark-processing/ 上。 - Sim
2个回答

13
如果您坚持使用这种分区策略,那么答案取决于您是否愿意承担分区发现成本。
如果您愿意让Spark发现所有分区(仅需要一次,直到添加新文件),则可以加载基本路径,然后使用分区列进行过滤。
如果您不想让Spark发现所有分区,例如因为您有数百万个文件,则唯一有效的通用解决方案是将要查询的区间分成几个子区间,使用@r0bb23的方法轻松查询,然后将它们组合在一起。
如果以上两种情况都想要最好的结果,并且您有一个稳定的模式,则可以通过定义外部分区表在元存储中注册分区。如果您期望模式会发生改变,请不要这样做,因为元存储管理的表在此时管理模式演变得很差。
例如,要查询2017年10月6日到2017年11月3日之间的内容,您可以执行以下操作:
// With full discovery
spark.read.parquet("hdfs:///basepath")
  .where('Year === 2017 && (
    ('Month === 10 && 'Day >= 6') || ('Month === 11 && 'Day <= 3')
  ))

// With partial discovery
val df1 = spark.read.option("basePath", "hdfs:///basepath/")
  .parquet("hdfs:///basepath/Year=2017/Month=10/Day={0[6-9], [1-3][0-9]}/*/")
val df2 = spark.read.option("basePath", "hdfs:///basepath/")
  .parquet("hdfs:///basepath/Year=2017/Month=11/Day={0[1-3]}/*/")
val df = df1.union(df2)

确实可以编写通用代码,但我没有遇到过这种情况。更好的方法是按照我在评论中提出的方式进行分区。如果您的表使用类似于/basepath/ts=yyyymmddhhmm/*.parquet的方式进行分区,则答案很简单:

按照评论中提到的方式进行分区即可,例如使用/basepath/ts=yyyymmddhhmm/*.parquet对表进行分区。
spark.read.parquet("hdfs:///basepath")
  .where('ts >= 201710060000L && 'ts <= 201711030000L)

把小时和分钟加进去是值得的原因在于你可以编写通用代码,处理时间间隔而无需关注数据按周、日、小时或每15分钟分区。实际上你甚至可以管理不同粒度的数据在同一个表里,例如旧数据被聚合到更高的层级以减少需要发现的总分区数。


1
由于Spark在保存为Parquet格式时为每个分区创建一个文件夹:您最后的通用建议不会创建大量文件夹(每分钟一个),这难道不会成为操作系统的问题(I/O /资源方面)吗? (在基于Unix的系统上,我认为有时需要通过ulimit进行调整)。顺便说一句,感谢您的出色回答。 - Aydin K.
1
@AydinK。关于你的问题,我有两个想法。首先,具有分辨率到分钟级别的能力并不意味着将分区划分为单个分钟级别是有意义的。我听说生产中最小的是15分钟,即最后两位数为“00”、“15”、“30”和“45”。其次,使用标准文件系统处理大数据是不寻常的。大多数生产环境使用类似HDFS或基于云的对象存储(如AWS S3)的东西,可以处理非常大量的对象。 - Sim

5

编辑以添加多个加载路径来解决评论问题。

您可以使用正则表达式风格的语法。

val dataset = spark
  .read
  .format("parquet")
  .option("filterPushdown", "true")
  .option("basePath", "hdfs:///basepath/")
  .load("hdfs:///basepath/Year=2017/Month=10/Day={0[6-9],[1-3][0-9]}/*/",
    "hdfs:///basepath/Year=2017/Month=11/Day={0[1-3]}/*/")

如何使用正则表达式在sc.textFile中包含/排除某些输入文件?

注意:您不需要X=*,如果想要所有的天、月等,只需使用*

您可能还应该阅读一些关于谓词下推(即上面设置为true的filterPushdown)的内容。

最后,您将注意到上面的basepath选项,其原因可以在此处找到:防止DataFrame.partitionBy()从模式中删除分区列


这不是解决问题的通用方法。事实上,使用此分区策略查询日期间隔没有简单的通用解决方案。例如,您如何使用此方法查询2017-10-062017-11-03之间的数据? - Sim
你下面的回答中有一些好的信息。但是你不需要在你的回答中展示union(请参见上面的编辑)。所以我必须说,我认为它比你给予的信用更具普适性,尽管它需要一些不太美观的辅助函数。但对于许多系统来说,这是值得的。因为正如你承认的那样,在规模上进行分区发现并不便宜。部分发现在规模上更好。虽然我同意,更好的分区策略会有所帮助。我使用的是类似于你下面的代码的东西,使得辅助函数和上面的代码都变得微不足道。 - Robert Beatty

网页内容由stack overflow 提供, 点击上面的
可以查看英文原文,
原文链接