如何在Spark中使用repartition()指定文件大小

7

我正在使用pyspark,并且我有一个大型数据源,我想要显式地指定每个分区中的文件大小来重新分区。

我知道使用repartition(500)函数将把我的parquet文件分成500个几乎相等大小的文件。 问题是每天都会向这个数据源添加新数据。有些天可能有大量输入,有些天可能有较小的输入。因此,在一段时间内查看分区文件大小分布时,每个文件的大小在200KB到700KB之间变化。

我考虑指定每个分区的最大大小,以便每天获得更或多或少相同大小的文件,不管文件数目。 这将帮助我在稍后运行我的大型数据集的作业时避免执行时间和洗牌时间等方面的倾斜。

是否有一种方法可以在使用repartition()函数或将数据框写入parquet时指定它?

1个回答

5
你可以考虑使用参数maxRecordsPerFile来撰写结果。
storage_location = //...
estimated_records_with_desired_size = 2000
result_df.write.option(
     "maxRecordsPerFile", 
     estimated_records_with_desired_size) \
     .parquet(storage_location, compression="snappy")

2
但是为了做到这一点,我首先需要找出100MB文件中有多少条记录,然后将maxRecordsPerFile设置为正确的值,对吗? 难道没有直接指定文件最大大小的方法吗? - thentangler
1
你的理解是正确的。直接回答你的问题,目前还没有。在将内存中的DataFrame写入磁盘(或对象存储位置,如AWS S3)之前,需要对其进行编码和压缩,并且默认的持久化模式是StorageLevel.MEMORY_AND_DISK。简单地说,在结果完全写入磁盘之前,无法估计写入过程中文件的实际大小。 - Scott Hsieh
明白了。那么,我该如何找到仅有100MB数据的行数呢? - thentangler
谢谢。我问这个问题是为了估计当我尝试使用你在答案中提供的方法时,会有多少行数据进入分区。 你如何找到记录的大小?我尝试使用在这个论坛的一个答案中建议的getByte(df.head()),但它在pyspark中没有起作用。 - thentangler
假设你通过 .repartition(8) 写出了一个结果,抓住其中一个 8 个文件之一,假设你的结果中有 5 列且这些列是固定的,即无论生成多少分区(文件),总会有 5 列。 在这种情况下,你可以得到 5 列行的平均大小,然后可以暂时确定一个大致的 maxRecordsPerFile 数量。 核心思想是尽可能地接近你结果的数据分布。 - Scott Hsieh
显示剩余3条评论

网页内容由stack overflow 提供, 点击上面的
可以查看英文原文,
原文链接