我的数据集是这样分区的:
Year=yyyy
|---Month=mm
| |---Day=dd
| | |---<parquet-files>
在Spark中创建一个包含两个日期之间数据的DataFrame,最简单和高效的方法是什么?
我的数据集是这样分区的:
Year=yyyy
|---Month=mm
| |---Day=dd
| | |---<parquet-files>
在Spark中创建一个包含两个日期之间数据的DataFrame,最简单和高效的方法是什么?
// With full discovery
spark.read.parquet("hdfs:///basepath")
.where('Year === 2017 && (
('Month === 10 && 'Day >= 6') || ('Month === 11 && 'Day <= 3')
))
// With partial discovery
val df1 = spark.read.option("basePath", "hdfs:///basepath/")
.parquet("hdfs:///basepath/Year=2017/Month=10/Day={0[6-9], [1-3][0-9]}/*/")
val df2 = spark.read.option("basePath", "hdfs:///basepath/")
.parquet("hdfs:///basepath/Year=2017/Month=11/Day={0[1-3]}/*/")
val df = df1.union(df2)
确实可以编写通用代码,但我没有遇到过这种情况。更好的方法是按照我在评论中提出的方式进行分区。如果您的表使用类似于/basepath/ts=yyyymmddhhmm/*.parquet
的方式进行分区,则答案很简单:
按照评论中提到的方式进行分区即可,例如使用/basepath/ts=yyyymmddhhmm/*.parquet
对表进行分区。
spark.read.parquet("hdfs:///basepath")
.where('ts >= 201710060000L && 'ts <= 201711030000L)
把小时和分钟加进去是值得的原因在于你可以编写通用代码,处理时间间隔而无需关注数据按周、日、小时或每15分钟分区。实际上你甚至可以管理不同粒度的数据在同一个表里,例如旧数据被聚合到更高的层级以减少需要发现的总分区数。
编辑以添加多个加载路径来解决评论问题。
您可以使用正则表达式风格的语法。
val dataset = spark
.read
.format("parquet")
.option("filterPushdown", "true")
.option("basePath", "hdfs:///basepath/")
.load("hdfs:///basepath/Year=2017/Month=10/Day={0[6-9],[1-3][0-9]}/*/",
"hdfs:///basepath/Year=2017/Month=11/Day={0[1-3]}/*/")
如何使用正则表达式在sc.textFile中包含/排除某些输入文件?
注意:您不需要X=*
,如果想要所有的天、月等,只需使用*
。
您可能还应该阅读一些关于谓词下推(即上面设置为true的filterPushdown)的内容。
最后,您将注意到上面的basepath选项,其原因可以在此处找到:防止DataFrame.partitionBy()从模式中删除分区列
2017-10-06
至2017-11-03
之间的数据? - Sim
/tbl/ts=yyyymmddhhmm/*.parquet
。关于这个主题有一个章节在 https://spark-summit.org/east-2017/events/bulletproof-jobs-patterns-for-large-scale-spark-processing/ 上。 - Sim