在本地机器上进行Spark测试

3
我正在使用sbt test运行Spark 1.3.1上的单元测试,除了单元测试非常缓慢之外,我还遇到了java.lang.ClassNotFoundException: org.apache.spark.storage.RDDBlockId问题。通常这意味着依赖关系有问题,但我不知道是从哪里出了问题。尝试在一台新机器上安装所有内容,包括新的hadoop、新的ivy2,但仍然遇到相同的问题。
非常感谢您的帮助。
异常:
Exception in thread "Driver Heartbeater" java.lang.ClassNotFoundException: 
    org.apache.spark.storage.RDDBlockId
    at java.net.URLClassLoader$1.run(URLClassLoader.java:366)
    at java.net.URLClassLoader$1.run(URLClassLoader.java:355)
    at java.security.AccessController.doPrivileged(Native Method)
    at java.net.URLClassLoader.findClass(URLClassLoader.java:354)
    at java.lang.ClassLoader.loadClass(ClassLoader.java:425)
    at java.lang.ClassLoader.loadClass(ClassLoader.java:358)
    at java.lang.Class.forName0(Native Method)
    at java.lang.Class.forName(Class.java:270)

我的build.sbt文件:

libraryDependencies ++=  Seq( 
  "org.scalaz"              %% "scalaz-core" % "7.1.2" excludeAll ExclusionRule(organization = "org.slf4j"), 
  "com.typesafe.play"       %% "play-json" % "2.3.4" excludeAll ExclusionRule(organization = "org.slf4j"), 
  "org.apache.spark"        %% "spark-core" % "1.3.1" % "provided"  withSources() excludeAll (ExclusionRule(organization = "org.slf4j"), ExclusionRule("org.spark-project.akka", "akka-actor_2.10")), 
  "org.apache.spark"        %% "spark-graphx" % "1.3.1" % "provided" withSources() excludeAll (ExclusionRule(organization = "org.slf4j"), ExclusionRule("org.spark-project.akka", "akka-actor_2.10")), 
  "org.apache.cassandra"    % "cassandra-all" % "2.1.6", 
  "org.apache.cassandra"    % "cassandra-thrift" % "2.1.6", 
  "com.typesafe.akka" %% "akka-actor" % "2.3.11", 
  "com.datastax.cassandra"  % "cassandra-driver-core" % "2.1.6" withSources() withJavadoc() excludeAll (ExclusionRule(organization = "org.slf4j"),ExclusionRule(organization = "org.apache.spark"),ExclusionRule(organization = "com.twitter",name = "parquet-hadoop-bundle")), 
  "com.github.nscala-time"  %% "nscala-time" % "1.2.0" excludeAll ExclusionRule(organization = "org.slf4j") withSources(), 
  "com.datastax.spark"      %% "spark-cassandra-connector-embedded" % "1.3.0-M2" excludeAll (ExclusionRule(organization = "org.slf4j"),ExclusionRule(organization = "org.apache.spark"),ExclusionRule(organization = "com.twitter",name = "parquet-hadoop-bundle")), 
  "com.datastax.spark"      %% "spark-cassandra-connector" % "1.3.0-M2" excludeAll (ExclusionRule(organization = "org.slf4j"),ExclusionRule(organization = "org.apache.spark"),ExclusionRule(organization = "com.twitter",name = "parquet-hadoop-bundle")), 
  "org.slf4j"               % "slf4j-api"            % "1.6.1", 
   "com.twitter"            % "jsr166e" % "1.1.0", 
  "org.slf4j"               % "slf4j-nop" % "1.6.1" % "test", 
  "org.scalatest"           %% "scalatest" % "2.2.1" % "test" excludeAll ExclusionRule(organization = "org.slf4j") 
) 

我的spark测试设置(我已将其全部禁用以进行测试)

(spark.kryo.registrator,com.my.spark.MyRegistrator) 
(spark.eventLog.dir,) 
(spark.driver.memory,16G) 
(spark.kryoserializer.buffer.mb,512) 
(spark.akka.frameSize,5) 
(spark.shuffle.spill,false) 
(spark.default.parallelism,8) 
(spark.shuffle.consolidateFiles,false) 
(spark.serializer,org.apache.spark.serializer.KryoSerializer) 
(spark.shuffle.spill.compress,false) 
(spark.driver.host,10.10.68.66) 
(spark.akka.timeout,300) 
(spark.driver.port,55328) 
(spark.eventLog.enabled,false) 
(spark.cassandra.connection.host,127.0.0.1) 
(spark.cassandra.connection.ssl.enabled,false) 
(spark.master,local[8]) 
(spark.cassandra.connection.ssl.trustStore.password,password) 
(spark.fileserver.uri,http://10.10.68.66:55329) 
(spark.cassandra.auth.username,username) 
(spark.local.dir,/tmp/spark) 
(spark.app.id,local-1436229075894) 
(spark.storage.blockManagerHeartBeatMs,300000) 
(spark.executor.id,<driver>) 
(spark.storage.memoryFraction,0.5) 
(spark.app.name,Count all entries 217885402) 
(spark.shuffle.compress,false) 

一个已经组装或打包的jar文件发送到独立运行或Mesos上可以正常工作!有什么建议吗?
2个回答

1
我们在Spark 1.6.0中遇到了同样的问题(已经有一个错误报告了)。 我们通过切换到Kryo序列化器(无论如何都应该使用)来解决它。 因此,这似乎是默认JavaSerializer的一个bug。
只需执行以下操作即可摆脱它:
new SparkConf().setAppName("Simple Application").set("spark.serializer", "org.apache.spark.serializer.KryoSerializer")

0
原因是一个很大的广播变量。不确定为什么(因为它适合内存),但从测试用例中删除它可以使其正常工作。

我看到这个情况是在一个DataFrame上调用了cache(),而没有使用显式的广播变量。 - Sim
我这边也有同样的问题,但没有广播变量和缓存(cache())。 - blackbox

网页内容由stack overflow 提供, 点击上面的
可以查看英文原文,
原文链接