df.repartition和DataFrameWriter partitionBy的区别是什么?

70

DataFrame.repartition()DataFrameWriter.partitionBy()方法有什么区别?

我希望两者都可以基于数据框列进行分区?还是说它们之间有区别?


1
对于任何来到这个问题的人,这个也可能是相关的。 - y2k-shubham
3个回答

234

注意: 我认为被接受的答案不太正确!我很高兴你问了这个问题,因为这些类似命名的函数的行为在重要和意外的方面有所不同,在官方Spark文档中没有得到很好的记录。

被接受答案的第一部分是正确的:调用 df.repartition(COL, numPartitions=k) 将使用基于哈希的分区器创建一个包含 k 个分区的数据帧。这里的 COL 定义分区键,可以是单列也可以是列列表。基于哈希的分区器将每个输入行的分区键哈希成一个 k 个分区的空间,类似于 partition = hash(partitionKey) % k。这保证了所有具有相同分区键的行都会进入同一个分区。但是,多个分区键的行也可能进入同一个分区(当分区键之间发生哈希冲突时),某些分区可能为空

总之,df.repartition(COL, numPartitions=k) 的不直观之处在于:

  • 分区不会严格隔离分区键
  • 你的 k 个分区中有些可能为空,而另一些则可能包含多个分区键的行

df.write.partitionBy 的行为与之相当不同,以一种许多用户不会预料的方式。假设你希望输出文件按日期分区,而你的数据跨越7天。还假设 df 最初有10个分区。当你运行 df.write.partitionBy('day') 时,你应该期望有多少个输出文件?答案是“取决于情况”。如果你的起始分区中的每个分区都包含每天的数据,则答案是70。如果你的起始分区中的每个分区都只包含一天的数据,则答案是10。

我们如何解释这种行为?当你运行df.write时,df中的每个原始分区都被独立地写入。也就是说,你原来的10个分区中的每一个都是根据“day”列单独进行子分区,并且每个子分区都会写入一个单独的文件。

我认为这种行为相当令人恼火,希望有一种方法在写入数据框时进行全局重分区。


5
为了更明确地解释partitionBy的示例,您可以将其想象为:按照分区、列1、列2进行分组。这将告诉您要写入多少个文件。 - Farah
8
非常好的回答,同时加50分赞赏“与官方Spark文档中不太清晰记录的重要且意想不到的差异。”我的问题是:是否有一种方法可以针对您在最后一句中所描述的进行修改?例如df.write().repartition(COL).partitionBy(COL)这样的方式?我想要的是partitionBy()的行为,但是大致上与我最初拥有的文件大小和数量相同。这个怎么轻松实现呢?以partitionBy(date) => 70 files的例子说明了相关性。我希望每天有约10个文件,对于那些有更多数据的日期可能会有2或3个文件。 - seth127
2
@seth127 - 我有一些想法,但需要一些空间来解释。请将您的问题写成正式问题,我会给您写一个答案。 - conradlee
2
@conradlee 好的,这是链接:https://dev59.com/c1UL5IYBdhLWcg3wI1PC 提前致谢! - seth127
3
假设你有1000天的数据,想要按照日期列进行分区。那么运行 df.repartition(df.date, 1000)。许多人期望每个分区恰好包含一天的数据。然而,其中有些分区将会是空的,而其他分区则可能包含多天的数据。这对许多人来说很难理解(也许你不觉得,因此产生了混淆)。 - conradlee
显示剩余7条评论

53
如果您运行repartition(COL),则在计算过程中更改了分区 - 您将获得spark.sql.shuffle.partitions(默认值:200)个分区。然后如果调用.write,您将获得一个包含多个文件的目录。
如果您运行.write.partitionBy(COL),那么结果是,您将获得与COL中唯一值相同的目录数。这加快了进一步的数据读取(如果按分区列进行过滤),并节省了存储空间(分区列已从数据文件中删除)。 更新:请参见@conradlee的答案。他详细解释了应用不同方法后目录结构的外观以及两种情况下最终文件数量。

28

repartition() 用于在内存中对数据进行分区,而 partitionBy 则用于在磁盘上对数据进行分区。它们通常结合使用。

无论是 repartition() 还是 partitionBy 都可以用于“基于 DataFrame 列对数据进行分区”,但是 repartition() 在内存中对数据进行分区,partitionBy 则在磁盘上对数据进行分区。

repartition()

让我们通过一些代码来更好地理解分区。假设你有以下 CSV 数据。

first_name,last_name,country
Ernesto,Guevara,Argentina
Vladimir,Putin,Russia
Maria,Sharapova,Russia
Bruce,Lee,China
Jack,Ma,China

df.repartition(col("country"))会按国家在内存中重新分区数据。

让我们将数据写出来,以便检查每个内存分区的内容。

val outputPath = new java.io.File("./tmp/partitioned_by_country/").getCanonicalPath
df.repartition(col("country"))
  .write
  .csv(outputPath)

以下是数据在磁盘上的存储方式:

partitioned_by_country/
  part-00002-95acd280-42dc-457e-ad4f-c6c73be6226f-c000.csv
  part-00044-95acd280-42dc-457e-ad4f-c6c73be6226f-c000.csv
  part-00059-95acd280-42dc-457e-ad4f-c6c73be6226f-c000.csv
每个文件都包含单个国家的数据 - 例如,part-00059-95acd280-42dc-457e-ad4f-c6c73be6226f-c000.csv 文件包含了中国的数据:
Bruce,Lee,China
Jack,Ma,China

partitionBy()

使用partitionBy将数据写入磁盘,并查看文件系统输出的差异。

以下是将数据写入磁盘分区的代码。

val outputPath = new java.io.File("./tmp/partitionedBy_disk/").getCanonicalPath
df
  .write
  .partitionBy("country")
  .csv(outputPath)

这就是磁盘上的数据样式:

partitionedBy_disk/
  country=Argentina/
    part-00000-906f845c-ecdc-4b37-a13d-099c211527b4.c000.csv
  country=China/
    part-00000-906f845c-ecdc-4b37-a13d-099c211527b4.c000
  country=Russia/
    part-00000-906f845c-ecdc-4b37-a13d-099c211527b4.c000

为什么要在磁盘上分区数据?

在磁盘上进行数据分区可以使某些查询运行得更快。


网页内容由stack overflow 提供, 点击上面的
可以查看英文原文,
原文链接