为什么repartition()方法会增加磁盘上的文件大小?

29
我正在处理的数据湖(df)有 2 TB 的数据和 20,000 个文件。 我想将数据集压缩成 2,000 个 1 GB 文件。如果您运行 df.coalesce(2000) 并写入磁盘,则数据湖包含 1.9 TB 的数据。如果您运行 df.repartition(2000) 并写入磁盘,则数据湖包含 2.6 TB 的数据。repartition() 数据湖中的每个文件都比预期大 0.3 GB(它们都是 1.3 GB 文件而不是 1 GB 文件)。为什么 repartition() 方法会增加整个数据湖的大小?有一个相关的问题讨论了聚合后数据湖大小增加的原因。答案说:“通常,像 Parquet 这样的列式存储格式对数据分布(数据组织)和单个列的基数非常敏感。数据组织得越好,基数越低,存储效率就越高。” coalesce() 算法是否提供更有组织的数据...我认为不是...我不认为其他问题回答了我的问题。

可能相关:https://dev59.com/ylwZ5IYBdhLWcg3wTOzr - OmG
1
还有一个相关编程问题:为什么Spark Parquet文件的聚合大小比原始值大? - 10465355
你能统计一下在coalsec(2000)中生成的文件数量吗?如果少于2000,我们就可以得出一些结论。 - loneStar
@Achyuth - coalsece(2000) 生成了2000个文件。 - Powers
@Powers 重新分区可能会生成相等大小的文件,而合并可能会生成随机大小的文件。 - loneStar
显示剩余3条评论
2个回答

24

免责声明:

该答案主要包含猜测性质的内容。详细说明该现象可能需要对输入和输出(或至少它们各自的元数据)进行深入分析。

观察结果:

  1. 熵有效地限制了最强无损压缩的性能 - Wikipedia - 熵 (信息论)
  2. 无论是持久化列格式还是内部Spark SQL表示都会透明地应用不同的压缩技术(如游程编码字典编码)来减少存储数据的内存占用。

    此外,在磁盘上的文件格式(包括纯文本数据)可以使用通用的压缩算法显式压缩,目前不清楚这是否是此处的情况。

  3. 压缩(显式或透明)应用于数据块(通常是分区,但可以使用更小的单元)。

  4. 基于观察结果1),2)和3),我们可以假设平均压缩率将取决于群集中数据的分布。还应注意,如果上游谱系包含广泛的转换,则最终结果可能是非确定性的。

coalesce vs. repartition的可能影响:

通常情况下,coalesce有两种路径:

  • 通过管道升级到源 - 最常见的情况。
  • 传播到最近的Shuffle。

在第一种情况下,我们可以期望压缩率与输入的压缩率相当。但是,有些情况下,最终输出可以实现更小。让我们想象一个退化的数据集:

val df = sc.parallelize(
  Seq("foo", "foo", "foo", "bar", "bar", "bar"),
  6 
).toDF
如果像这样的数据集被写入磁盘,就没有压缩的可能性 - 每个值都必须按原样写入:
df.withColumn("pid", spark_partition_id).show
+-----+---+
|value|pid|
+-----+---+
|  foo|  0|
|  foo|  1|
|  foo|  2|
|  bar|  3|
|  bar|  4|
|  bar|  5|
+-----+---+

换句话说,我们大约需要6 * 3个字节,总共18个字节。

但是如果我们合并

df.coalesce(2).withColumn("pid", spark_partition_id).show
+-----+---+
|value|pid|
+-----+---+
|  foo|  0|
|  foo|  0|
|  foo|  0|
|  bar|  1|
|  bar|  1|
|  bar|  1|
+-----+---+

例如,我们可以使用小整数作为计数来应用RLE,并存储每个3 + 1个字节的分区,总共为8个字节。

当然,这只是一个极度简化的例子,但展示了如何保留低熵输入结构并合并块可以降低内存占用。

第二种coalesce场景不太明显,但在一些情况下上游进程可能会降低熵(例如窗口函数),保留这种结构将有益处。

repartition呢?

如果没有分区表达式,则repartition应用RoundRobinPartitioning(使用基于分区ID的伪随机键实现的HashPartitioning)。只要哈希函数表现得合理,这种重新分配应该最大化数据的熵,从而降低可能的压缩率。

结论:

coalesce单独使用时不应提供任何特定的好处,但可以保留数据分布的现有属性-在某些情况下,这种属性可能是有利的。

repartition由于其本质,平均而言会使情况变得更糟,除非数据的熵已经最大化(在非平凡数据集上可能存在改善情况,但这种情况高度不太可能)。

最后,使用分区表达式或repartitionByRange重新分区应该降低熵并提高压缩率。

注意:

我们还应该记住,列式格式通常基于运行时统计信息来决定特定的压缩/编码方法(或其缺乏)。因此,即使特定块中的行集固定,但行的顺序发生变化,我们也可以观察到不同的结果。


当您说要通过管道升级到源时,您是指 load().map().coalesce(1) 变成 load().coalesce(1).map() 吗?您能否举个例子或详细说明您所说的“传播到最近的洗牌”是什么意思? - girip11

1

我同意@10465355的回答。这里我有一个极端例子。

数据处理

有一个名为table_a的表,其所有列都是字符串。它的存储格式是Orc,并且是由以下程序生成的:

insert overwrite table table_a
select a,b,...,i
from table_other
group by a,b,...,i

在 HashAggregate 操作之后,table_a 表中的数据已经足够有组织了。特别是第一列 a。Orc 文件大小为 6.97 MB。(实际上还有一个小文件,大小为 2.09 KB,但我后来忽略了它。)

接下来,我们对 table_a 进行了 repartition

val querydf = spark.sql("""select *
    from table_a distribute by rand()""").repartition(1)

querydf.createOrReplaceTempView("tmpTable")

spark.sql("""insert overwrite table table_a 
select a,b,...,i
from tmpTable""")

numpartitions=1 时,Random(hashing.byteswap32(index)).nextInt(numPartitions) 不会触发随机重分配。因此,我们添加 distribute by rand() 相当于 repartition(n),并获得一个大小为14.26 MB的文件。

结果

我们可以使用 hive --orcfiledump 获取orc文件的文件结构。

repartition 之前:

Stripes:
  Stripe: offset: 3 data: 7288854 rows: 668265 tail: 354 index: 13637
    Stream: column 0 section ROW_INDEX start: 3 length 50
    Stream: column 1 section ROW_INDEX start: 53 length 1706
    Stream: column 2 section ROW_INDEX start: 1759 length 672
    Stream: column 3 section ROW_INDEX start: 2431 length 2297
    Stream: column 4 section ROW_INDEX start: 4728 length 1638
    Stream: column 5 section ROW_INDEX start: 6366 length 1270
    Stream: column 6 section ROW_INDEX start: 7636 length 1887
    Stream: column 7 section ROW_INDEX start: 9523 length 1823
    Stream: column 8 section ROW_INDEX start: 11346 length 1120
    Stream: column 9 section ROW_INDEX start: 12466 length 1174
    Stream: column 1 section DATA start: 13640 length 209662
    Stream: column 1 section LENGTH start: 223302 length 1158
    Stream: column 1 section DICTIONARY_DATA start: 224460 length 231328
    Stream: column 2 section DATA start: 455788 length 29861
    Stream: column 2 section LENGTH start: 485649 length 5
    Stream: column 2 section DICTIONARY_DATA start: 485654 length 33
    Stream: column 3 section DATA start: 485687 length 424936
    Stream: column 3 section LENGTH start: 910623 length 4069
    Stream: column 3 section DICTIONARY_DATA start: 914692 length 41298
    Stream: column 4 section DATA start: 955990 length 443602
    Stream: column 4 section LENGTH start: 1399592 length 4122
    Stream: column 4 section DICTIONARY_DATA start: 1403714 length 56217
    Stream: column 5 section DATA start: 1459931 length 475983
    Stream: column 5 section LENGTH start: 1935914 length 2650
    Stream: column 5 section DICTIONARY_DATA start: 1938564 length 17798
    Stream: column 6 section DATA start: 1956362 length 480891
    Stream: column 6 section LENGTH start: 2437253 length 4230
    Stream: column 6 section DICTIONARY_DATA start: 2441483 length 27873
    Stream: column 7 section DATA start: 2469356 length 2716359
    Stream: column 7 section LENGTH start: 5185715 length 304679
    Stream: column 8 section DATA start: 5490394 length 438723
    Stream: column 8 section LENGTH start: 5929117 length 58072
    Stream: column 8 section DICTIONARY_DATA start: 5987189 length 424961
    Stream: column 9 section DATA start: 6412150 length 630248
    Stream: column 9 section LENGTH start: 7042398 length 1455
    Stream: column 9 section DICTIONARY_DATA start: 7043853 length 258641
    Encoding column 0: DIRECT
    Encoding column 1: DICTIONARY_V2[48184]
    Encoding column 2: DICTIONARY_V2[3]
    Encoding column 3: DICTIONARY_V2[4252]
    Encoding column 4: DICTIONARY_V2[4398]
    Encoding column 5: DICTIONARY_V2[4404]
    Encoding column 6: DICTIONARY_V2[5553]
    Encoding column 7: DIRECT_V2
    Encoding column 8: DICTIONARY_V2[105667]
    Encoding column 9: DICTIONARY_V2[60943]

重新分区后:

Stripes:
  Stripe: offset: 3 data: 14940022 rows: 668284 tail: 344 index: 12312
    Stream: column 0 section ROW_INDEX start: 3 length 50
    Stream: column 1 section ROW_INDEX start: 53 length 1755
    Stream: column 2 section ROW_INDEX start: 1808 length 678
    Stream: column 3 section ROW_INDEX start: 2486 length 1815
    Stream: column 4 section ROW_INDEX start: 4301 length 1297
    Stream: column 5 section ROW_INDEX start: 5598 length 1217
    Stream: column 6 section ROW_INDEX start: 6815 length 1841
    Stream: column 7 section ROW_INDEX start: 8656 length 1330
    Stream: column 8 section ROW_INDEX start: 9986 length 1289
    Stream: column 9 section ROW_INDEX start: 11275 length 1040
    Stream: column 1 section DATA start: 12315 length 4260547
    Stream: column 1 section LENGTH start: 4272862 length 15955
    Stream: column 2 section DATA start: 4288817 length 102153
    Stream: column 2 section LENGTH start: 4390970 length 5
    Stream: column 2 section DICTIONARY_DATA start: 4390975 length 33
    Stream: column 3 section DATA start: 4391008 length 1033345
    Stream: column 3 section LENGTH start: 5424353 length 4069
    Stream: column 3 section DICTIONARY_DATA start: 5428422 length 41298
    Stream: column 4 section DATA start: 5469720 length 1044769
    Stream: column 4 section LENGTH start: 6514489 length 4122
    Stream: column 4 section DICTIONARY_DATA start: 6518611 length 56217
    Stream: column 5 section DATA start: 6574828 length 1142805
    Stream: column 5 section LENGTH start: 7717633 length 2650
    Stream: column 5 section DICTIONARY_DATA start: 7720283 length 17798
    Stream: column 6 section DATA start: 7738081 length 1147888
    Stream: column 6 section LENGTH start: 8885969 length 4230
    Stream: column 6 section DICTIONARY_DATA start: 8890199 length 27873
    Stream: column 7 section DATA start: 8918072 length 1705640
    Stream: column 7 section LENGTH start: 10623712 length 208184
    Stream: column 7 section DICTIONARY_DATA start: 10831896 length 1525605
    Stream: column 8 section DATA start: 12357501 length 513225
    Stream: column 8 section LENGTH start: 12870726 length 58100
    Stream: column 8 section DICTIONARY_DATA start: 12928826 length 424905
    Stream: column 9 section DATA start: 13353731 length 1338510
    Stream: column 9 section LENGTH start: 14692241 length 1455
    Stream: column 9 section DICTIONARY_DATA start: 14693696 length 258641
    Encoding column 0: DIRECT
    Encoding column 1: DIRECT_V2
    Encoding column 2: DICTIONARY_V2[3]
    Encoding column 3: DICTIONARY_V2[4252]
    Encoding column 4: DICTIONARY_V2[4398]
    Encoding column 5: DICTIONARY_V2[4404]
    Encoding column 6: DICTIONARY_V2[5553]
    Encoding column 7: DICTIONARY_V2[378283]
    Encoding column 8: DICTIONARY_V2[105678]
    Encoding column 9: DICTIONARY_V2[60943]

Orc使用Run-length编码和字典编码来压缩数据。这里是编码DICTIONARY_V2的含义。REF:ORCv1
| ENCODING | STREAM KIND | OPTIONAL | CONTENTS | | -------- | -------------- | -------- | -------------- | | DICTIONARY_V2| PRESENT | 是 | Boolean RLE | | | DATA | 否| 无符号整数RLE v2 | | | DICTIONARY_DATA | 否 | 字符串内容 | | | LENGTH | 否 | 无符号整数RLE v2 |
在字典编码中,如果值为[“Nevada”,“California”,“Nevada”,“California”和“Florida”],则DICTIONARY_DATA将是“CaliforniaFloridaNevada”,LENGTH将是[10,7,6],DATA将是[2,0,2,0,1]。
而无符号整数RLE v2也在REF中: ORCv1 在Hive 0.12中,ORC引入了Run Length Encoding版本2(RLEv2),其具有改进的压缩和固定位宽编码,以加快扩展速度。 RLEv2基于数据使用四个子编码: - 短重复 - 用于短序列中重复的值 - 直接 - 用于具有固定位宽的随机序列 - 修补基准 - 用于具有可变位宽的随机序列 - Delta - 用于单调递增或递减的序列
让我们聚焦于第一列。
# before repartition
Stream: column 1 section DATA start: 13640 length 209662
Stream: column 1 section LENGTH start: 223302 length 1158
Stream: column 1 section DICTIONARY_DATA start: 224460 length 231328
Encoding column 1: DICTIONARY_V2[48184]

# after repartition
Stream: column 1 section DATA start: 12315 length 4260547
Stream: column 1 section LENGTH start: 4272862 length 15955
Encoding column 1: DIRECT_V2

虽然我不知道Orc如何选择ENCODING,但是Orc认为在随机化后使用DIRECT_V2对于列1可以比使用DICTIONARY_V2节省更多的空间。实际上,在重新分区之后,空间几乎增加了10倍。(4260547+15955)/(209662+1158+231328)

其他列的大多数ENCODING没有改变,但大小已经增加。

比较

repartitioncoalesce
前者的文件大小是均匀的,以避免数据倾斜。
前者的数据大小变得更大。
*(潜在)*当过滤混沌数据时,ORC的行组索引不能使用。 在连接时,两者都需要再次洗牌。我使用上述数据进行测试,发现洗牌和排序之间的时间没有显着差异。


网页内容由stack overflow 提供, 点击上面的
可以查看英文原文,
原文链接