非分区Parquet数据上的谓词下推

4

我在S3中有一个包含Parquet数据的文件夹:

bucket_name/folder_name/年份/月份/日期

eg:
s3://bucket_name/folder_name/2020/12/10

我正在使用Apache SparkAWS EMR上读取parquet文件。

由于数据没有分区,有没有不用分区实现谓词下推过滤器的方法?

有哪些性能提升的最佳实践可以使用。


好的,我写了一个答案来识别您的分区。但是,如果您需要单独读取分区,则在这种情况下唯一的方法是像这样读取:s3://bucket_name/folder_name/2020//** 或 *s3://bucket_name/folder_name/2020/12/**等等... - Kafels
4个回答

4
我将在代码中描述我的解决方案:
import pyspark.sql.functions as f
from pyspark.shell import spark

# Read absolute path and put "/*/*/*" to read all partitions
df = spark.read.parquet("s3://bucket_name/folder_name/*/*/*")

# Get absolute file path
df = df.withColumn('path', f.split(f.input_file_name(), '/'))

# Slice path and recover year / month / day in an array
df = df.withColumn('year_month_day', f.slice(f.col('path'), -4, 3))

# Transform array values to respective columns
df = df.withColumn('year', f.col('year_month_day').getItem(0))
df = df.withColumn('month', f.col('year_month_day').getItem(1))
df = df.withColumn('day', f.col('year_month_day').getItem(2))

# Drop temporary columns
df = df.drop('path', 'year_month_day')

df.show()

# TODO : Make your transformations
# .
# .
# .
# Save partitioned by year, month and day (if you want)
# df.write.partitionBy('year', 'month', 'day').parquet('...')

我的目录:

目录

输出结果:

+--------+--------+----+-----+---+
|column_a|column_b|year|month|day|
+--------+--------+----+-----+---+
| hello_1| hello_2|2019|   06| 10|
| world_1| world_2|2020|   12| 31|
+--------+--------+----+-----+---+

感谢您关注此事,这种方法将重写数据。我正在尝试在现有文件夹结构中实现谓词下推。 - bob
是的,我误读了你的问题并给出了这个答案,但如果你看一下我在你的问题下面的评论,我写了唯一的谓词下推方法。 - Kafels

1
  1. 您可以使用每个日期级别子文件夹的路径手动添加分区。这样,您就不必重新编写表格,但是您的Metastore将拥有大量分区条目,这可能会降低查询启动速度。
ALTER TABLE table_name ADD [IF NOT EXISTS]
  PARTITION
  (partition_col1_name = partition_col1_value
  [,partition_col2_name = partition_col2_value]
  [,...])
  [LOCATION 'location1']
  [PARTITION
  (partition_colA_name = partition_colA_value
  [,partition_colB_name = partition_colB_value
  [,...])]
  [LOCATION 'location2']
  [,...]

语法:https://docs.aws.amazon.com/athena/latest/ug/alter-table-add-partition.html

  1. 您可以将表转换为开源Delta Lake格式(http://delta.io
-- Convert unpartitioned parquet table at path 'path/to/table'
CONVERT TO DELTA parquet.`path/to/table`

这将会添加一个./_delta_log文件夹和一个较小的Delta Lake事务日志。Spark将利用存储在Delta日志中的最大最小值来确定要跳过哪些文件。这样可以跳过您不感兴趣的日期(也可以更广泛地使用)。参见线程:https://delta-users.slack.com/archives/CJ70UCSHM/p1602189649142400?thread_ts=1602098197.114400&cid=CJ70UCSHM 您需要在作业中包含Delta Lake Spark软件包,这样您就可以获得ACID属性和更多功能。
您可以从EMR迁移到Databricks以访问更多性能提升。

0

Spark还可以使用过滤器push downparquets,即使数据没有按特定谓词进行分区。但是,如果您的数据以一种能够帮助理解您请求的数据是否在parquet中的方式组织,则大多数情况下会从中受益。

例如,假设您有一个日期列,并且您没有按日期进行分区。结果是您有许多具有不同日期的文件,并且您正在查询特定日期,因此spark和parquet将在扫描/加载数据时过滤此日期。如果您根据此日期对数据进行排序,这将有所帮助,这样您将能够将较少的文件加载到内存中(因为较少的文件符合此要求的过滤器推送)。

您的问题非常普遍,取决于用例。


0

你不能将文件夹结构重命名以分区数据吗?

我相信如果你将文件夹重命名为:

s3://bucket_name/folder_name/year=2020/month=12/day=10

你可以这样做:
spark.read.parquet(" s3://bucket_name/folder_name/")

最终生成的数据框将按年/月/日进行分区


网页内容由stack overflow 提供, 点击上面的
可以查看英文原文,
原文链接