我正在使用 Spark 3.0.1,搭配用户提供的 Hadoop 3.2.0 和 Scala 2.12.10,在 Kubernetes 上运行。
当读取一个采用snappy压缩格式的parquet文件时,一切正常。然而,当我尝试读取一个采用zstd压缩格式的parquet文件时,几个任务失败并显示以下错误:
java.io.IOException: Decompression error: Version not supported
at com.github.luben.zstd.ZstdInputStream.readInternal(ZstdInputStream.java:164)
at com.github.luben.zstd.ZstdInputStream.read(ZstdInputStream.java:120)
at java.io.BufferedInputStream.fill(BufferedInputStream.java:246)
at java.io.BufferedInputStream.read1(BufferedInputStream.java:286)
at java.io.BufferedInputStream.read(BufferedInputStream.java:345)
at java.io.ObjectInputStream$PeekInputStream.read(ObjectInputStream.java:2781)
at java.io.ObjectInputStream$PeekInputStream.readFully(ObjectInputStream.java:2797)
at java.io.ObjectInputStream$BlockDataInputStream.readShort(ObjectInputStream.java:3274)
at java.io.ObjectInputStream.readStreamHeader(ObjectInputStream.java:934)
at java.io.ObjectInputStream.(ObjectInputStream.java:396)
at org.apache.spark.MapOutputTracker$.deserializeObject$1(MapOutputTracker.scala:954)
at org.apache.spark.MapOutputTracker$.deserializeMapStatuses(MapOutputTracker.scala:964)
at org.apache.spark.MapOutputTrackerWorker.$anonfun$getStatuses$2(MapOutputTracker.scala:856)
at org.apache.spark.util.KeyLock.withLock(KeyLock.scala:64)
at org.apache.spark.MapOutputTrackerWorker.getStatuses(MapOutputTracker.scala:851)
at org.apache.spark.MapOutputTrackerWorker.getMapSizesByExecutorId(MapOutputTracker.scala:808)
at org.apache.spark.shuffle.sort.SortShuffleManager.getReader(SortShuffleManager.scala:128)
at org.apache.spark.sql.execution.ShuffledRowRDD.compute(ShuffledRowRDD.scala:185)
at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:349)
at org.apache.spark.rdd.RDD.iterator(RDD.scala:313)
at org.apache.spark.rdd.MapPartitionsRDD.compute(MapPartitionsRDD.scala:52)
at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:349)
at org.apache.spark.rdd.RDD.iterator(RDD.scala:313)
at org.apache.spark.rdd.ZippedPartitionsRDD2.compute(ZippedPartitionsRDD.scala:89)
at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:349)
at org.apache.spark.rdd.RDD.iterator(RDD.scala:313)
at org.apache.spark.rdd.MapPartitionsRDD.compute(MapPartitionsRDD.scala:52)
at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:349)
at org.apache.spark.rdd.RDD.iterator(RDD.scala:313)
at org.apache.spark.rdd.MapPartitionsRDD.compute(MapPartitionsRDD.scala:52)
at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:349)
at org.apache.spark.rdd.RDD.iterator(RDD.scala:313)
at org.apache.spark.scheduler.ResultTask.runTask(ResultTask.scala:90)
at org.apache.spark.scheduler.Task.run(Task.scala:127)
at org.apache.spark.executor.Executor$TaskRunner.$anonfun$run$3(Executor.scala:446)
at org.apache.spark.util.Utils$.tryWithSafeFinally(Utils.scala:1377)
at org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:449)
at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149)
at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624)
at java.lang.Thread.run(Thread.java:748)
我不理解的是,这些任务在重试后成功了,但并不总是如此,因此我的作业经常失败。如前所述,如果我使用相同的数据集压缩为 snappy,则一切正常。我还尝试构建 Spark 和 Hadoop,并更改 zstd-jni 版本,但仍然出现相同的行为。有人知道可能发生了什么吗?谢谢!
MapOutputTracker$.deserializeMapStatuses
后,似乎是 map shuffle 数据的反序列化失败了。这些数据还使用了由spark.shuffle.mapStatus.compression.codec
定义的压缩编解码器进行了压缩,默认情况下为 zstd。将其更改为lz4
就解决了我的问题。 - balaudt%%configure -f \NEWLINE {"conf":{"spark.shuffle.mapStatus.compression.codec":"lz4"}}
来更改配置。 - leezu