使用 zstd 压缩编解码器时,Spark 3.0.1 的任务失败

7

我正在使用 Spark 3.0.1,搭配用户提供的 Hadoop 3.2.0Scala 2.12.10,在 Kubernetes 上运行。

当读取一个采用snappy压缩格式的parquet文件时,一切正常。然而,当我尝试读取一个采用zstd压缩格式的parquet文件时,几个任务失败并显示以下错误:

java.io.IOException: Decompression error: Version not supported
at com.github.luben.zstd.ZstdInputStream.readInternal(ZstdInputStream.java:164)
at com.github.luben.zstd.ZstdInputStream.read(ZstdInputStream.java:120)
at java.io.BufferedInputStream.fill(BufferedInputStream.java:246)
at java.io.BufferedInputStream.read1(BufferedInputStream.java:286)
at java.io.BufferedInputStream.read(BufferedInputStream.java:345)
at java.io.ObjectInputStream$PeekInputStream.read(ObjectInputStream.java:2781)
at java.io.ObjectInputStream$PeekInputStream.readFully(ObjectInputStream.java:2797)
at java.io.ObjectInputStream$BlockDataInputStream.readShort(ObjectInputStream.java:3274)
at java.io.ObjectInputStream.readStreamHeader(ObjectInputStream.java:934)
at java.io.ObjectInputStream.(ObjectInputStream.java:396)
at org.apache.spark.MapOutputTracker$.deserializeObject$1(MapOutputTracker.scala:954)
at org.apache.spark.MapOutputTracker$.deserializeMapStatuses(MapOutputTracker.scala:964)
at org.apache.spark.MapOutputTrackerWorker.$anonfun$getStatuses$2(MapOutputTracker.scala:856)
at org.apache.spark.util.KeyLock.withLock(KeyLock.scala:64)
at org.apache.spark.MapOutputTrackerWorker.getStatuses(MapOutputTracker.scala:851)
at org.apache.spark.MapOutputTrackerWorker.getMapSizesByExecutorId(MapOutputTracker.scala:808)
at org.apache.spark.shuffle.sort.SortShuffleManager.getReader(SortShuffleManager.scala:128)
at org.apache.spark.sql.execution.ShuffledRowRDD.compute(ShuffledRowRDD.scala:185)
at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:349)
at org.apache.spark.rdd.RDD.iterator(RDD.scala:313)
at org.apache.spark.rdd.MapPartitionsRDD.compute(MapPartitionsRDD.scala:52)
at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:349)
at org.apache.spark.rdd.RDD.iterator(RDD.scala:313)
at org.apache.spark.rdd.ZippedPartitionsRDD2.compute(ZippedPartitionsRDD.scala:89)
at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:349)
at org.apache.spark.rdd.RDD.iterator(RDD.scala:313)
at org.apache.spark.rdd.MapPartitionsRDD.compute(MapPartitionsRDD.scala:52)
at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:349)
at org.apache.spark.rdd.RDD.iterator(RDD.scala:313)
at org.apache.spark.rdd.MapPartitionsRDD.compute(MapPartitionsRDD.scala:52)
at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:349)
at org.apache.spark.rdd.RDD.iterator(RDD.scala:313)
at org.apache.spark.scheduler.ResultTask.runTask(ResultTask.scala:90)
at org.apache.spark.scheduler.Task.run(Task.scala:127)
at org.apache.spark.executor.Executor$TaskRunner.$anonfun$run$3(Executor.scala:446)
at org.apache.spark.util.Utils$.tryWithSafeFinally(Utils.scala:1377)
at org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:449)
at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149)
at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624)
at java.lang.Thread.run(Thread.java:748)

我不理解的是,这些任务在重试后成功了,但并不总是如此,因此我的作业经常失败。如前所述,如果我使用相同的数据集压缩为 snappy,则一切正常。我还尝试构建 Spark 和 Hadoop,并更改 zstd-jni 版本,但仍然出现相同的行为。有人知道可能发生了什么吗?谢谢!

4
最近我也遇到了相同的问题。仔细查看相关代码 MapOutputTracker$.deserializeMapStatuses 后,似乎是 map shuffle 数据的反序列化失败了。这些数据还使用了由 spark.shuffle.mapStatus.compression.codec 定义的压缩编解码器进行了压缩,默认情况下为 zstd。将其更改为 lz4 就解决了我的问题。 - balaudt
@balaudt 非常感谢!这确实解决了我的问题! - phzz
1
对于Jupyter Notebook Sparkmagic会话,您可以通过运行%%configure -f \NEWLINE {"conf":{"spark.shuffle.mapStatus.compression.codec":"lz4"}}来更改配置。 - leezu
2
上游报告 https://issues.apache.org/jira/browse/SPARK-35199 - leezu
1个回答

2

如评论所述,我更新了Spark(3.0.1)的配置,并添加了以下属性来永久解决这个问题。文件路径和添加的配置如下:

$SPARK_HOME/conf/spark-defaults.conf
spark.shuffle.mapStatus.compression.codec lz4

网页内容由stack overflow 提供, 点击上面的
可以查看英文原文,
原文链接