java.lang.UnsupportedOperationException: 在写入 Spark 时出现错误

12
当我尝试将数据集写入Parquet文件时,出现以下错误。
18/11/05 06:25:43 ERROR FileFormatWriter: Aborting job null.
org.apache.spark.SparkException: Job aborted due to stage failure: Task 84 in stage 1.0 failed 4 times, most recent failure: Lost task 84.3 in stage 1.0 (TID 989, ip-10-253-194-207.nonprd.aws.csp.net, executor 4): java.lang.UnsupportedOperationException: org.apache.parquet.column.values.dictionary.PlainValuesDictionary$PlainBinaryDictionary
        at org.apache.parquet.column.Dictionary.decodeToInt(Dictionary.java:48)
        at org.apache.spark.sql.execution.vectorized.OnHeapColumnVector.getInt(OnHeapColumnVector.java:233)
        at org.apache.spark.sql.catalyst.expressions.GeneratedClass$GeneratedIterator.processNext(Unknown Source)
        at org.apache.spark.sql.execution.BufferedRowIterator.hasNext(BufferedRowIterator.java:43)
        at org.apache.spark.sql.execution.WholeStageCodegenExec$$anonfun$8$$anon$1.hasNext(WholeStageCodegenExec.scala:377)
        at scala.collection.Iterator$$anon$11.hasNext(Iterator.scala:408)
        at org.apache.spark.shuffle.sort.BypassMergeSortShuffleWriter.write(BypassMergeSortShuffleWriter.java:126)
        at org.apache.spark.scheduler.ShuffleMapTask.runTask(ShuffleMapTask.scala:96)
        at org.apache.spark.scheduler.ShuffleMapTask.runTask(ShuffleMapTask.scala:53)
        at org.apache.spark.scheduler.Task.run(Task.scala:99)
        at org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:282)
        at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1142)
        at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:617)
        at java.lang.Thread.run(Thread.java:745)

但是当我运行 dataset.show() 时,我可以查看数据。不确定该检查什么原因。

3个回答

16

有一种更简单的方法来检测Parquet文件之间的模式差异,使用选项mergeSchema,它将显示日志中不一致的字段。

示例代码:

spark.read.option("mergeSchema", "True").parquet(fileList:_*) 

示例日志:

Caused by: org.apache.spark.SparkException: Failed to merge fields 'field1' and 'field1'. Failed to merge incompatible data types DoubleType and LongType

6

我遇到了相同的问题,我的情况是由于 parquet 文件之间的架构差异所导致的:

给定以下 parquet 目录和一些文件:

  • /user/user1/parquet_table/part-00000-1e73689f-69e5-471a-8510-1547d108fea3-c000.parquet
  • /user/user1/parquet_table/part-00000-276bf4c0-7214-4278-8131-53cd5339a50d-c000.parquet

当我尝试将它们合并(使用spark2-shell)时:

val parquetFileDF = spark.read.parquet("/user/user1/parquet_table/part-00000-*.parquet")
val parquetFileDFCoal = parquetFileDF.coalesce(8)
parquetFileDFCoal.write.parquet("/tmp/testTemp/0001")

我遇到了这个异常:
[Stage 4:> (0 + 8) / 8]20/05/13 17:09:03 WARN scheduler.TaskSetManager: Lost task 5.0 in stage 4.0 (TID 116, node.localhost.localdomain, executor 70): org.apache.spark.SparkException: Task failed while writing rows
at org.apache.spark.sql.execution.datasources.FileFormatWriter$.org$apache$spark$sql$execution$datasources$FileFormatWriter$$executeTask(FileFormatWriter.scala:272)
at org.apache.spark.sql.execution.datasources.FileFormatWriter$$anonfun$write$1$$anonfun$apply$mcV$sp$1.apply(FileFormatWriter.scala:191)
at org.apache.spark.sql.execution.datasources.FileFormatWriter$$anonfun$write$1$$anonfun$apply$mcV$sp$1.apply(FileFormatWriter.scala:190)
at org.apache.spark.scheduler.ResultTask.runTask(ResultTask.scala:87)
...
Caused by: java.lang.UnsupportedOperationException: parquet.column.values.dictionary.PlainValuesDictionary$PlainBinaryDictionary
at parquet.column.Dictionary.decodeToInt(Dictionary.java:48)
at org.apache.spark.sql.execution.vectorized.OnHeapColumnVector.getInt(OnHeapColumnVector.java:233)
at org.apache.spark.sql.catalyst.expressions.GeneratedClass$GeneratedIterator.processNext(Unknown Source)

如果你使用spark2-shell检查每个文件,你可能会发现模式存在差异。在这里:
scala> val parquetFileDF = spark.read.parquet("/user/user1/parquet_table/part-00000-1e73689f-69e5-471a-8510-1547d108fea3-c000.parquet")
parquetFileDF: org.apache.spark.sql.DataFrame = [root_id: string, father_id: string ... 7 more fields]

scala> parquetFileDF.printSchema()
root
|-- root_id: string (nullable = true)
|-- father_id: string (nullable = true)
|-- self_id: string (nullable = true)
|-- group_name: string (nullable = true)
|-- father_name: string (nullable = true)
|-- cle: string (nullable = true)
|-- value: integer (nullable = true)


scala> val parquetFileDF = spark.read.parquet("/user/user1/parquet_table/part-00000-276bf4c0-7214-4278-8131-53cd5339a50d-c000.parquet ")
parquetFileDF: org.apache.spark.sql.DataFrame = [root_id: string, father_id: string ... 7 more fields]

scala> parquetFileDF.printSchema()
root
|-- root_id: string (nullable = true)
|-- father_id: string (nullable = true)
|-- self_id: string (nullable = true)
|-- group_name: string (nullable = true)
|-- father_name: string (nullable = true)
|-- cle: string (nullable = true)
|-- value: string (nullable = true)

你可以看到有时候数值字段是整数,有时候是字符串。为了解决这个问题,你需要将其中一个文件转换成匹配的数据类型。


1

您是否已经仔细检查了所有日志,确保没有OutOfMemory错误?有可能您正在使用Parquet不支持的数据类型。

请提供相应的源代码,包括模式定义(case class或其他)和写入操作。


网页内容由stack overflow 提供, 点击上面的
可以查看英文原文,
原文链接