如何通过Spark更改写入文件的ZSTD压缩级别?

10
在Spark文档中指出,默认的zstd压缩级别为1。https://spark.apache.org/docs/latest/configuration.html 我在spark-defaults.conf和代码内部设置了不同的值。
val conf = new SparkConf(false)
conf.set("spark.io.compression.zstd.level", "22")
val spark = SparkSession.builder.config(conf).getOrCreate()
..

使用相同的输入,多次将其保存/写入parquet格式并使用zstd压缩,并没有改变输出文件的大小。如何在Spark中调整此压缩级别?

zstd 压缩等级 22 属于 --ultra 领域。只需“调到十一”并检查是否适用于您。 - darked89
1
你指出的对于命令行zstd工具是正确的。然而,在Spark中设置哪个值并不重要,因为它使用开源的zstd JNI实现,这些事情可能在https://github.com/luben/zstd-jni/blob/5ae1cf6b3cee822b78cc2a052dcf0a294b2946db/src/main/native/jni_zstd.c中处理。 - belce
1
问题在于这里的setLevel函数的level参数,https://github.com/apache/spark/blob/0494dc90af48ce7da0625485a4dc6917a244d580/core/src/main/scala/org/apache/spark/io/CompressionCodec.scala 它从SparkConf中读取为conf.get(IO_COMPRESSION_ZSTD_LEVEL),但不知何故似乎没有生效。我尝试了不同的值,包括低于超级领域的值。 - belce
只是好奇,有人尝试使用spark.io.compression.codec="zstd"和将spark.io.compression.zstd.level设置为超过20的方式启动相当大的Spark管道(至少具有一些连接)吗? :) - ei-grad
嗨 @ei-grad,我试过在https://stackoverflow.com/questions/77204328/how-to-set-zstd-compression-level-in-aws-glue-job上提问了我的问题,但到目前为止还没有运气。 - undefined
3个回答

2
参数spark.io.compression.zstd.level是用于压缩中间文件(序列化的RDD、shuffle、广播、检查点)的编解码器。在大多数情况下,唯一重要的是压缩速度,所以默认值1是最佳选择(还应将spark.io.compression.codec设置为zstd,以使该参数生效)。
遗憾的是,在Spark中无法为Parquet编解码器指定压缩级别,即spark.sql.parquet.compression.codec
从Spark 3.2开始(使用parquet-mr>=1.12.0),有一个parquet.compression.codec.zstd.level选项,但似乎不起作用:
In [5]: for i in [1, 5, 10]: df.write.option('parquet.compression.codec.zstd.level', i
   ...: ).parquet(f"test-{i}.parquet", compression='zstd', mode='overwrite')
                                                                                
In [6]: !du -sh test-*.parquet
40M test-10.parquet
40M test-1.parquet
40M test-5.parquet

作为一种解决方法,可以使用来自“arrow”项目的Parquet实现(直接在C++中,或通过pyarrow / go等方式;它允许为每个列的编解码器指定“compression_level”,以及默认的“compression_level”值)在将数据写入仓库之前重新打包数据。
遗憾的是,“arrow-rs” Parquet实现不允许指定“compression_level”。但幸运的是,“parquet2”在“arrow2”中使用(arrow的无转换rust实现)- 允许

1

似乎在Spark中初始化parquet zstd压缩器的过程中,并没有使用parquet.compression.codec.zstd.level。该问题已被标记为通过PR解决,该PR在文档中明确了spark.io.compression.zstd.level :-(。 - ei-grad

0

您可以通过配置parquet.compression.codec.zstd.level来更改级别,例如...config("parquet.compression.codec.zstd.level","3")

有关更多属性,请查看org.apache.parquet.hadoop.codec.ZstandardCodec.java

由于其他提到的22似乎有点极端,我很想知道您的用例是什么。


网页内容由stack overflow 提供, 点击上面的
可以查看英文原文,
原文链接