如何解决java.lang.ClassCastException:无法将scala.collection.immutable.List的实例分配给字段类型scala.collection.Seq?

15

这个错误是最难追踪的。我不确定发生了什么事情。我在我的本地机器上运行一个Spark集群。因此,整个Spark集群都在一个主机(127.0.0.1)下,我在独立模式下运行。

JavaPairRDD<byte[], Iterable<CassandraRow>> cassandraRowsRDD= javaFunctions(sc).cassandraTable("test", "hello" )
   .select("rowkey", "col1", "col2", "col3",  )
   .spanBy(new Function<CassandraRow, byte[]>() {
        @Override
        public byte[] call(CassandraRow v1) {
            return v1.getBytes("rowkey").array();
        }
    }, byte[].class);

Iterable<Tuple2<byte[], Iterable<CassandraRow>>> listOftuples = cassandraRowsRDD.collect(); //ERROR HAPPENS HERE
Tuple2<byte[], Iterable<CassandraRow>> tuple = listOftuples.iterator().next();
byte[] partitionKey = tuple._1();
for(CassandraRow cassandraRow: tuple._2()) {
    System.out.println("************START************");
    System.out.println(new String(partitionKey));
    System.out.println("************END************");
}

这个错误是最难追踪的。它明显发生在 cassandraRowsRDD.collect() ,但我不知道为什么?

这个错误是最难以追踪的,它显然会在cassandraRowsRDD.collect()处发生,但我不知道原因。

16/10/09 23:36:21 ERROR Executor: Exception in task 2.3 in stage 0.0 (TID 21)
java.lang.ClassCastException: cannot assign instance of scala.collection.immutable.List$SerializationProxy to field org.apache.spark.rdd.RDD.org$apache$spark$rdd$RDD$$dependencies_ of type scala.collection.Seq in instance of org.apache.spark.rdd.MapPartitionsRDD
    at java.io.ObjectStreamClass$FieldReflector.setObjFieldValues(ObjectStreamClass.java:2133)
    at java.io.ObjectStreamClass.setObjFieldValues(ObjectStreamClass.java:1305)
    at java.io.ObjectInputStream.defaultReadFields(ObjectInputStream.java:2006)
    at java.io.ObjectInputStream.readSerialData(ObjectInputStream.java:1924)
    at java.io.ObjectInputStream.readOrdinaryObject(ObjectInputStream.java:1801)
    at java.io.ObjectInputStream.readObject0(ObjectInputStream.java:1351)
    at java.io.ObjectInputStream.defaultReadFields(ObjectInputStream.java:2000)
    at java.io.ObjectInputStream.readSerialData(ObjectInputStream.java:1924)
    at java.io.ObjectInputStream.readOrdinaryObject(ObjectInputStream.java:1801)
    at java.io.ObjectInputStream.readObject0(ObjectInputStream.java:1351)
    at java.io.ObjectInputStream.readObject(ObjectInputStream.java:371)
    at org.apache.spark.serializer.JavaDeserializationStream.readObject(JavaSerializer.scala:75)
    at org.apache.spark.serializer.JavaSerializerInstance.deserialize(JavaSerializer.scala:114)
    at org.apache.spark.scheduler.ResultTask.runTask(ResultTask.scala:66)
    at org.apache.spark.scheduler.Task.run(Task.scala:85)
    at org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:274)
    at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1142)
    at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:617)
    at java.lang.Thread.run(Thread.java:745)

这里是我使用的版本

Scala code runner version 2.11.8  // when I run scala -version or even ./spark-shell


compile group: 'org.apache.spark' name: 'spark-core_2.11' version: '2.0.0'
compile group: 'org.apache.spark' name: 'spark-streaming_2.11' version: '2.0.0'
compile group: 'org.apache.spark' name: 'spark-sql_2.11' version: '2.0.0'
compile group: 'com.datastax.spark' name: 'spark-cassandra-connector_2.11' version: '2.0.0-M3': 

在引入了一些叫做 "provided" 的东西后,我的 gradle 文件看起来像是这样的。实际上它似乎并不存在,但谷歌建议创建一个,所以我的 build.gradle 文件就像这样。

group 'com.company'
version '1.0-SNAPSHOT'

apply plugin: 'java'
apply plugin: 'idea'

repositories {
    mavenCentral()
    mavenLocal()
}

configurations {
    provided
}
sourceSets {
    main {
        compileClasspath += configurations.provided
        test.compileClasspath += configurations.provided
        test.runtimeClasspath += configurations.provided
    }
}

idea {
    module {
        scopes.PROVIDED.plus += [ configurations.provided ]
    }
}

dependencies {
    compile 'org.slf4j:slf4j-log4j12:1.7.12'
    provided group: 'org.apache.spark', name: 'spark-core_2.11', version: '2.0.0'
    provided group: 'org.apache.spark', name: 'spark-streaming_2.11', version: '2.0.0'
    provided group: 'org.apache.spark', name: 'spark-sql_2.11', version: '2.0.0'
    provided group: 'com.datastax.spark', name: 'spark-cassandra-connector_2.11', version: '2.0.0-M3'
}



jar {
    from { configurations.provided.collect { it.isDirectory() ? it : zipTree(it) } }
   // with jar
    from sourceSets.test.output
    manifest {
        attributes 'Main-Class': "com.company.batchprocessing.Hello"
    }
    exclude 'META-INF/.RSA', 'META-INF/.SF', 'META-INF/*.DSA'
    zip64 true
}

我使用分叉 https://www.scala-sbt.org/release/docs/Forking.html 解决了这个问题。 - Kakaji
6个回答

18

我遇到了同样的问题,通过将我的应用程序 jar 添加到 Spark 的 classpath 中解决了它。

spark = SparkSession.builder()
        .appName("Foo")
        .config("spark.jars", "target/scala-2.11/foo_2.11-0.1.jar")

1
这对我有用,但为什么需要这样做?有人知道吗?如果确实需要,我希望Spark能自动解决这个问题。 - Nick
6
我认为,每当您使用引用项目中方法/类的lambda执行任何类型的map操作时,都需要提供它们作为额外的jar文件。Spark确实会序列化lambda本身,但不会一起拉取其依赖项。不确定为什么错误消息没有提供有用信息。 - Holger Brandl
我通过将主应用程序代码与Uber JAR分离,并通过extraClassPath提供它来修复了这个问题:.config("spark.executor.extraClassPath","/app/lib/original-SparkApp-1.0-SNAPSHOT.jar") - CᴴᴀZ

6

我遇到了同样的异常,并研究了多个相关的 Jira(92191267518075)。

我认为异常名称很令人困惑,真正的问题是 Spark 集群和驱动程序应用之间的 环境设置不一致

例如,我在 conf/spark-defaults.conf 中使用以下行启动了我的 Spark 集群:

spark.master                     spark://master:7077

当我使用以下一行启动我的驱动程序(即使使用spark-submit启动程序):

sparkSession.master("spark://<master ip>:7077")

在这里, 是节点 master 的正确IP地址,但由于这个简单的不一致性,程序将无法运行。

因此,我建议所有驱动程序应该使用spark-submit启动,并且不要在驱动代码中重复任何配置(除非您需要覆盖某些配置)。换句话说,只需让spark-submit以与运行Spark集群相同的方式设置您的环境即可。

3
在我的情况下,我需要添加 spark-avro jar 包(我将它放在主 jar 包旁边的 /lib 文件夹中):
SparkSession spark = SparkSession.builder().appName("myapp").getOrCreate();
...
spark.sparkContext().addJar("lib/spark-avro_2.11-4.0.0.jar");

1
在我的情况下,我不得不使用 spark.sparkContext.addJar("lib/spark-avro_2.11-4.0.0.jar");(括号已删除)。 - Selnay

1
您的call()方法应该返回以下格式的byte[]。
@Override
public byte[] call(CassandraRow v1) {
  return v1.getBytes("rowkey").array();
}

如果您仍然遇到问题,请检查您的依赖项版本,如Jira中所述 https://issues.apache.org/jira/browse/SPARK-9219

嗨!对不起,我确实有.array(),我刚刚更新了问题。看起来我复制代码时弄错了一些地方,但现在应该没问题了。 - user1870400
我也看到了那个链接,但我无法弄清楚它的含义,所以我粘贴了我正在使用的所有版本。我正在使用Java 8,所以我不太了解Scala相关的内容,也不明白将库标记为“provided”是什么意思。 - user1870400
我测试了你的代码,它在使用Spark 2.0.0的独立模式下运行良好。尝试清理你的构建环境,重新构建并进行测试。 "provided"依赖项意味着JAR将在运行时可用。请查看https://maven.apache.org/guides/introduction/introduction-to-dependency-mechanism.html#Dependency_Scope - abaghel
你标记它们为已提供了吗?如果是这样,你从上面的哪些库中标记它们为已提供? - user1870400
我正在使用Maven依赖项在Eclipse中运行Spark Java程序,因此我没有将它们标记为提供的。如果您想使用spark-submit在集群中运行构建的jar并且要使用Spark提供的jar,则可以将它们标记为提供的。请检查构建环境和集群环境中的jar文件和版本。 - abaghel

0

检查你的代码 - 在Intellij中:Analyze... -> 检查代码。如果你有与序列化相关的废弃方法,请修复它。或者简单地尝试降低Spark或Scala版本。在我的情况下,我将Scala版本降低到2.10,所有问题都解决了。


0

我在Spark集群中的一个Ubuntu节点上使用Eclipse运行作业时遇到了同样的问题。我将UDF创建为单独的Java类。在本地运行Spark时一切正常,但转向Yarn时会抛出与问题中相同的异常。

我通过将生成的类的路径放入Spark类路径中来解决了这个问题,其中包括类似于Holger Brandl建议的UDF类。

我创建了一个classpath变量:

String cpVar = "..../target/classes"

并将其作为配置变量添加到Spark中:

.config("spark.driver.extraClassPath", cpVar)
.config("spark.executorEnv.CLASSPATH", cpVar)

编辑:

仅将类路径添加到驱动节点可以解决问题,但集群中的其他节点仍然可能出现相同的错误。我得到的最终解决方案是在每次构建后将生成的类放入HDFS,并为Spark设置类路径以指向HDFS文件夹,如下所示。

sparkSession.sparkContext().addJar("hdfs:///user/.../classes");

请查看TheMP的答案。

你知道是否可以指定Maven仓库,而不是每次都创建JAR文件吗? - undefined
每次提交都会生成类,因此您需要每次都进行替换。 - undefined

网页内容由stack overflow 提供, 点击上面的
可以查看英文原文,
原文链接