社区!
请帮助我了解如何在Spark中获得更好的压缩比?
让我描述一下场景:
我有一个数据集,在HDFS上称之为产品,使用Sqoop ImportTool作为parquet文件使用编解码器snappy导入。导入结果,我有100个文件,总大小为46 GB du,文件大小不同(最小11MB,最大1.5GB,平均值约为500MB)。总记录数略多于80亿条,有84列
我正在使用snappy进行简单的读取/重分区/写入Spark,结果是:
输出大小约为100 GB,具有相同数量的文件、相同的编解码器、相同的计数和相同的列。
代码片段:
val productDF = spark.read.parquet("/ingest/product/20180202/22-43/")
productDF
.repartition(100)
.write.mode(org.apache.spark.sql.SaveMode.Overwrite)
.option("compression", "snappy")
.parquet("/processed/product/20180215/04-37/read_repartition_write/general")
- 使用parquet-tools,我查看了随机的ingest和processed文件,它们如下所示:
ingest:
creator: parquet-mr version 1.5.0-cdh5.11.1 (build ${buildNumber})
extra: parquet.avro.schema = {"type":"record","name":"AutoGeneratedSchema","doc":"Sqoop import of QueryResult","fields"
and almost all columns looks like
AVAILABLE: OPTIONAL INT64 R:0 D:1
row group 1: RC:3640100 TS:36454739 OFFSET:4
AVAILABLE: INT64 SNAPPY DO:0 FPO:172743 SZ:370515/466690/1.26 VC:3640100 ENC:RLE,PLAIN_DICTIONARY,BIT_PACKED ST:[min: 126518400000, max: 1577692800000, num_nulls: 2541633]
处理:
creator: parquet-mr version 1.5.0-cdh5.12.0 (build ${buildNumber})
extra: org.apache.spark.sql.parquet.row.metadata = {"type":"struct","fields"
AVAILABLE: OPTIONAL INT64 R:0 D:1
...
row group 1: RC:6660100 TS:243047789 OFFSET:4
AVAILABLE: INT64 SNAPPY DO:0 FPO:4122795 SZ:4283114/4690840/1.10 VC:6660100 ENC:BIT_PACKED,PLAIN_DICTIONARY,RLE ST:[min: -2209136400000, max: 10413820800000, num_nulls: 4444993]
另一方面,如果没有重新分区或使用coalesce - 数据集的大小将保持接近数据摄取的大小。
接下来,我进行了以下操作:
读取数据集并将其写回
productDF .write.mode(org.apache.spark.sql.SaveMode.Overwrite) .option("compression", "none") .parquet("/processed/product/20180215/04-37/read_repartition_write/nonewithoutshuffle")
读取数据集,重新分区并带有写回操作
productDF .repartition(500) .write.mode(org.apache.spark.sql.SaveMode.Overwrite) .option("compression", "none") .parquet("/processed/product/20180215/04-37/read_repartition_write/nonewithshuffle")
结果为:80 GB 没有重新分区,283 GB 重新分区并生成相同数量的输出文件
80GB Parquet 元数据示例:
AVAILABLE: INT64 UNCOMPRESSED DO:0 FPO:456753 SZ:1452623/1452623/1.00 VC:11000100 ENC:RLE,PLAIN_DICTIONARY,BIT_PACKED ST:[min: -1735747200000, max: 2524550400000, num_nulls: 7929352]
283 GB Parquet元数据示例:
AVAILABLE: INT64 UNCOMPRESSED DO:0 FPO:2800387 SZ:2593838/2593838/1.00 VC:3510100 ENC:RLE,PLAIN_DICTIONARY,BIT_PACKED ST:[min: -2209136400000, max: 10413820800000, num_nulls: 2244255]
看起来,即使没有未压缩的数据,Parquet本身(含编码)也能大大减小数据大小。怎么做到的? :)
我尝试读取未压缩的80GB数据,重新分区并写回 - 结果变成了283 GB。
对我来说,首要问题是为什么Spark重新分区/洗牌后文件大小会变大?
第二个问题是如何有效地在Spark中洗牌数据以利用Parquet编码/压缩(如果有)?
总的来说,我不想让我的数据在Spark处理后变得更大,即使我没有改变任何内容。
另外,我无法找到是否有可配置的Snappy压缩率,例如-1…-9? 我知道gzip有这个功能,但在Spark / Parquet writer中控制这个速率的方式是什么?
感谢任何帮助!
谢谢!
DataFrameStatFunctions
,但我不够强大,无法找到它们有用。可以有人建议如何处理数据组织问题吗? - Mikhail DubkovDataFrame
API实现相同数据组织的方法,但发现从2.3.0开始将提供范围分区器,如https://dev59.com/t10Z5IYBdhLWcg3w3DQW中所讨论的那样。我将尝试到RDD级别并实现自定义范围分区器以测试数据分布。 - Mikhail Dubkov