Spark + Parquet + Snappy:Spark在洗牌数据后整体压缩比率下降

15

社区!

请帮助我了解如何在Spark中获得更好的压缩比?

让我描述一下场景:

  1. 我有一个数据集,在HDFS上称之为产品,使用Sqoop ImportTool作为parquet文件使用编解码器snappy导入。导入结果,我有100个文件,总大小为46 GB du,文件大小不同(最小11MB,最大1.5GB,平均值约为500MB)。总记录数略多于80亿条,有84列

  2. 我正在使用snappy进行简单的读取/重分区/写入Spark,结果是:

输出大小约为100 GB,具有相同数量的文件、相同的编解码器、相同的计数和相同的列。

代码片段:

val productDF = spark.read.parquet("/ingest/product/20180202/22-43/")

productDF
.repartition(100)
.write.mode(org.apache.spark.sql.SaveMode.Overwrite)
.option("compression", "snappy")
.parquet("/processed/product/20180215/04-37/read_repartition_write/general")
  1. 使用parquet-tools,我查看了随机的ingest和processed文件,它们如下所示:

ingest:

creator:                        parquet-mr version 1.5.0-cdh5.11.1 (build ${buildNumber}) 
extra:                          parquet.avro.schema = {"type":"record","name":"AutoGeneratedSchema","doc":"Sqoop import of QueryResult","fields"

and almost all columns looks like
AVAILABLE: OPTIONAL INT64 R:0 D:1

row group 1:                    RC:3640100 TS:36454739 OFFSET:4 

AVAILABLE:                       INT64 SNAPPY DO:0 FPO:172743 SZ:370515/466690/1.26 VC:3640100 ENC:RLE,PLAIN_DICTIONARY,BIT_PACKED ST:[min: 126518400000, max: 1577692800000, num_nulls: 2541633]

处理:

creator:                        parquet-mr version 1.5.0-cdh5.12.0 (build ${buildNumber}) 
extra:                          org.apache.spark.sql.parquet.row.metadata = {"type":"struct","fields"

AVAILABLE:                      OPTIONAL INT64 R:0 D:1
...

row group 1:                    RC:6660100 TS:243047789 OFFSET:4 

AVAILABLE:                       INT64 SNAPPY DO:0 FPO:4122795 SZ:4283114/4690840/1.10 VC:6660100 ENC:BIT_PACKED,PLAIN_DICTIONARY,RLE ST:[min: -2209136400000, max: 10413820800000, num_nulls: 4444993]

另一方面,如果没有重新分区或使用coalesce - 数据集的大小将保持接近数据摄取的大小。

  1. 接下来,我进行了以下操作:

    • 读取数据集并将其写回

      productDF
        .write.mode(org.apache.spark.sql.SaveMode.Overwrite)
        .option("compression", "none")
        .parquet("/processed/product/20180215/04-37/read_repartition_write/nonewithoutshuffle")
      
    • 读取数据集,重新分区并带有写回操作

      productDF
        .repartition(500)
        .write.mode(org.apache.spark.sql.SaveMode.Overwrite)
        .option("compression", "none")
        .parquet("/processed/product/20180215/04-37/read_repartition_write/nonewithshuffle")
      

结果为:80 GB 没有重新分区,283 GB 重新分区并生成相同数量的输出文件

80GB Parquet 元数据示例:

AVAILABLE:                       INT64 UNCOMPRESSED DO:0 FPO:456753 SZ:1452623/1452623/1.00 VC:11000100 ENC:RLE,PLAIN_DICTIONARY,BIT_PACKED ST:[min: -1735747200000, max: 2524550400000, num_nulls: 7929352]

283 GB Parquet元数据示例:

AVAILABLE:                       INT64 UNCOMPRESSED DO:0 FPO:2800387 SZ:2593838/2593838/1.00 VC:3510100 ENC:RLE,PLAIN_DICTIONARY,BIT_PACKED ST:[min: -2209136400000, max: 10413820800000, num_nulls: 2244255]

看起来,即使没有未压缩的数据,Parquet本身(含编码)也能大大减小数据大小。怎么做到的? :)

我尝试读取未压缩的80GB数据,重新分区并写回 - 结果变成了283 GB。

  • 对我来说,首要问题是为什么Spark重新分区/洗牌后文件大小会变大?

  • 第二个问题是如何有效地在Spark中洗牌数据以利用Parquet编码/压缩(如果有)?

总的来说,我不想让我的数据在Spark处理后变得更大,即使我没有改变任何内容。

另外,我无法找到是否有可配置的Snappy压缩率,例如-1…-9? 我知道gzip有这个功能,但在Spark / Parquet writer中控制这个速率的方式是什么?

感谢任何帮助!

谢谢!


为什么Spark Parquet文件的聚合比原始文件更大? - Alper t. Turker
1
谢谢 @user8371915!现在我明白为什么大小不同了,我尝试按数据集中的某个(幸运找到的)列重新分区,结果得到的是80GB而不是一直保持的250GB。 但第二个问题是,如何找出解决这种问题的方法。我尝试查看 DataFrameStatFunctions,但我不够强大,无法找到它们有用。可以有人建议如何处理数据组织问题吗? - Mikhail Dubkov
1
在我的特定数据集情况下,Sqoop导入结果相当小,约为50GB的压缩数据。我认为这是因为Sqoop导入的分区具有有序范围的主键,例如第一个分区的ID从1到100000,分区内的数据更接近,并且使用parquet和snappy具有更好的编码/压缩比率。 - Mikhail Dubkov
我尝试找到一种使用DataFrame API实现相同数据组织的方法,但发现从2.3.0开始将提供范围分区器,如https://dev59.com/t10Z5IYBdhLWcg3w3DQW中所讨论的那样。我将尝试到RDD级别并实现自定义范围分区器以测试数据分布。 - Mikhail Dubkov
2个回答

5

当你在dataframe上调用repartition(n)时,实际上执行的是循环分区。任何先前存在的数据本地性都会消失,熵会增加。因此,运行长度和字典编码器以及压缩编解码器没有太多可处理的内容。

因此,在重新分区时,需要使用repartition(n, col)版本,并给它一个能保留数据本地性的好列。

另外,由于您可能正在为下游作业优化sqooped表,因此可以使用sortWithinPartition进行更快的扫描。

df.repartition(100, $"userId").sortWithinPartitions("userId").write.parquet(...)


“is gone entropy has gone up” 这句话是什么意思? - Blue Clouds

1

这与Parquet如何压缩数据有关。简而言之,如果您要将1000行写入单个文件,请考虑一个包含字符串的列,Parquet使用字典编码来存储它们。

如果所有1000个字符串都不同,则需要使用较大的字典编码(具有1000个键映射)--通常称为大熵。 如果所有1000个字符串都相同,则需要使用较小的字典(仅具有一个键映射)--通常称为小熵

由于较大的字典会导致更多的数据(更高的熵情况),因此会导致磁盘上的文件大小更大。


网页内容由stack overflow 提供, 点击上面的
可以查看英文原文,
原文链接