如何解决这个错误 org.apache.spark.sql.catalyst.errors.package$TreeNodeException?

15

我有两个进程,每个进程都要做以下三件事情: 1)连接Oracle数据库并读取指定表格 2)将数据形成dataframe并进行处理 3)将dataframe保存到Cassandra中。

如果我同时运行这两个进程,它们都会尝试从Oracle中读取数据, 而当第二个进程读取数据时,我会收到以下错误信息:

 ERROR ValsProcessor2: org.apache.spark.sql.catalyst.errors.package$TreeNodeException: execute, tree:
Exchange SinglePartition
+- *(1) HashAggregate(keys=[], functions=[partial_count(1)], output=[count#290L])
   +- *(1) Scan JDBCRelation((SELECT * FROM BM_VALS WHERE ROWNUM <= 10) T) [numPartitions=2] [] PushedFilters: [], ReadSchema: struct<>
at org.apache.spark.sql.catalyst.errors.package$.attachTree(package.scala:56)
at org.apache.spark.sql.execution.exchange.ShuffleExchangeExec.doExecute(ShuffleExchangeExec.scala:119)
at org.apache.spark.sql.execution.SparkPlan$$anonfun$execute$1.apply(SparkPlan.scala:131)
at org.apache.spark.sql.execution.SparkPlan$$anonfun$execute$1.apply(SparkPlan.scala:127)
at org.apache.spark.sql.execution.SparkPlan$$anonfun$executeQuery$1.apply(SparkPlan.scala:155)
at org.apache.spark.rdd.RDDOperationScope$.withScope(RDDOperationScope.scala:151)
at org.apache.spark.sql.execution.SparkPlan.executeQuery(SparkPlan.scala:152)
at org.apache.spark.sql.execution.SparkPlan.execute(SparkPlan.scala:127)
at org.apache.spark.sql.execution.InputAdapter.inputRDDs(WholeStageCodegenExec.scala:371)
at org.apache.spark.sql.execution.aggregate.HashAggregateExec.inputRDDs(HashAggregateExec.scala:150)
at org.apache.spark.sql.execution.WholeStageCodegenExec.doExecute(WholeStageCodegenExec.scala:605)
at org.apache.spark.sql.execution.SparkPlan$$anonfun$execute$1.apply(SparkPlan.scala:131)
at org.apache.spark.sql.execution.SparkPlan$$anonfun$execute$1.apply(SparkPlan.scala:127)
at org.apache.spark.sql.execution.SparkPlan$$anonfun$executeQuery$1.apply(SparkPlan.scala:155)
at org.apache.spark.rdd.RDDOperationScope$.withScope(RDDOperationScope.scala:151)
at org.apache.spark.sql.execution.SparkPlan.executeQuery(SparkPlan.scala:152)
at org.apache.spark.sql.execution.SparkPlan.execute(SparkPlan.scala:127)
at org.apache.spark.sql.execution.SparkPlan.getByteArrayRdd(SparkPlan.scala:247)
at org.apache.spark.sql.execution.SparkPlan.executeCollect(SparkPlan.scala:294)
at org.apache.spark.sql.Dataset$$anonfun$count$1.apply(Dataset.scala:2770)
at org.apache.spark.sql.Dataset$$anonfun$count$1.apply(Dataset.scala:2769)
at org.apache.spark.sql.Dataset$$anonfun$52.apply(Dataset.scala:3254)
at org.apache.spark.sql.execution.SQLExecution$.withNewExecutionId(SQLExecution.scala:77)
at org.apache.spark.sql.Dataset.withAction(Dataset.scala:3253)
at org.apache.spark.sql.Dataset.count(Dataset.scala:2769)
at com.snp.processors.BenchmarkModelValsProcessor2.process(BenchmarkModelValsProcessor2.scala:43)
at com.snp.utils.Utils$$anonfun$getAllDefinedProcessors$2.apply(Utils.scala:28)
at com.snp.utils.Utils$$anonfun$getAllDefinedProcessors$2.apply(Utils.scala:28)
at com.sp.MigrationDriver$$anonfun$main$2$$anonfun$apply$1.apply(MigrationDriver.scala:78)
at com.sp.MigrationDriver$$anonfun$main$2$$anonfun$apply$1.apply(MigrationDriver.scala:78)
at scala.Option.map(Option.scala:146)
at com.sp.MigrationDriver$$anonfun$main$2.apply(MigrationDriver.scala:75)
at com.sp.MigrationDriver$$anonfun$main$2.apply(MigrationDriver.scala:74)
at scala.collection.Iterator$class.foreach(Iterator.scala:891)
at scala.collection.AbstractIterator.foreach(Iterator.scala:1334)
at scala.collection.MapLike$DefaultKeySet.foreach(MapLike.scala:174)
at com.sp.MigrationDriver$.main(MigrationDriver.scala:74)
at com.sp.MigrationDriver.main(MigrationDriver.scala)
Caused by: java.lang.NullPointerException
    at org.apache.spark.sql.execution.exchange.ShuffleExchangeExec$.needToCopyObjectsBeforeShuffle(ShuffleExchangeExec.scala:163)
    at org.apache.spark.sql.execution.exchange.ShuffleExchangeExec$.prepareShuffleDependency(ShuffleExchangeExec.scala:300)
    at org.apache.spark.sql.execution.exchange.ShuffleExchangeExec.prepareShuffleDependency(ShuffleExchangeExec.scala:91)
    at org.apache.spark.sql.execution.exchange.ShuffleExchangeExec$$anonfun$doExecute$1.apply(ShuffleExchangeExec.scala:128)
    at org.apache.spark.sql.execution.exchange.ShuffleExchangeExec$$anonfun$doExecute$1.apply(ShuffleExchangeExec.scala:119)
    at org.apache.spark.sql.catalyst.errors.package$.attachTree(package.scala:52)
    ... 37 more

我在这里做错了什么?怎样修复它?

@Alexott 先生,这里出了什么问题? - BdEngineer
你能发一下你的代码吗? - sramalingam24
@Alexott 先生,这是代码 https://gist.github.com/shatestest/86ae9559c6114999e13a6eec3c80ec2b#file-driver-scala - BdEngineer
3个回答

4

我在第一个处理器/调用类中的 finally 块中关闭了 sparkSession。我将其从处理器中移出,并放在调用类中解决了问题。


1
我曾经遇到过同样的问题,我认为这个问题与从 Oracle 中读取数据时出现偏斜列有关,并导致 Spark 中只有单一分区。建议任何遇到此问题的人使用平衡分区列。

1
它听起来更像是一条评论而不是一个答案。 - Yan Sklyarenko

1
我不确定真正的原因是什么,唯一引起我注意的是以下SQL表达式:(SELECT * FROM BM_VALS WHERE ROWNUM <= 10) T - 这里的T是什么意思?
关于整体设计,我建议采用完全不同的方法。在你的情况下,有两个处理器处理从Oracle收集的相同数据,并且每个处理器单独获取数据。我建议将读取Oracle数据移动到单独的过程中,该过程将返回数据框架(您需要缓存它),然后您的处理器将使用该数据框架并将数据持久化到Cassandra。
或者如之前建议的那样,您可以将作业分成两部分 - 一部分从Oracle提取所有数据,并将数据框架存储到磁盘中(不是persist,而是使用write),例如,作为Parquet文件。然后是单独的作业,将从磁盘中取出数据,并执行必要的转换。
在这两种方案中,您

先生,“(SELECT * FROM BM_VALS WHERE ROWNUM <= 10) T”是一个简单的Oracle测试查询示例...没有T就不能工作...可能是Spark-SQL的形式问题.... 2)这里我需要运行不同的处理器,每个处理器读取来自Oracle的不同数据(为了模拟读取/运行多个处理器的过程,我从两个开始...在我的情况下,都从同一个查询中读取...但我的目标是不同的查询...)。3)(你需要缓存它)???我应该在哪里缓存它?RAM可以吗? - BdEngineer
  1. 将数据框架存储到磁盘中(不是持久化,而是使用写入),例如,作为Parquet文件。如何编写?你的意思是我需要存储在磁盘上吗?这会影响性能吗?网络问题?
- BdEngineer
我非常推荐阅读《Spark: The definitive guide》的前几章 - 这将为编写Spark代码提供足够的背景知识。 - Alex Ott
谢谢您先生,当然。先生,我不明白的是,当第二次从Oracle读取数据时出现错误...不确定在我的情况下是否有一些Spark设置出了问题。 - BdEngineer
先生,这段从Oracle读取数据的代码在我的共享代码中可扩展吗?如果不行,那么生产级别的最佳实践是什么?即:/*
  • 为给定的模式和查询从Oracle加载数据。 */ val ora_m_vals_df = DbUtils.readOracleData(oraOptionDfConfig, "OracleSchemaTest" , PARTITION_COLUMN, "(SELECT * FROM BM_VALS WHERE ROWNUM <= 10) T" );
"
- BdEngineer

网页内容由stack overflow 提供, 点击上面的
可以查看英文原文,
原文链接