CDH 5.8.3中yarn-cluster模式下Spark Streaming与Kafka的集成

3
我在运行从Kafka读取的Spark Streaming作业时遇到了一个奇怪的问题。我使用的是CDH 5.8.3版本: Spark版本是1.6.0,Kafka版本是0.9.0。
我的代码非常简单:
val kafkaParams = Map[String, String]("bootstrap.servers" -> brokersList, "auto.offset.reset" -> "smallest")
KafkaUtils.createDirectStream[String, String, StringDecoder, StringDecoder](ssc, kafkaParams, Set(kafkaTopic))

如果我在yarn-client模式下运行它,就没有错误。但如果我在yarn-cluster模式下运行程序,就会出现异常。我的启动命令是:

spark-submit --master yarn-cluster --files /etc/hbase/conf/hbase-site.xml --num-executors 5 --executor-memory 4G --jars (somejars for HBase interaction) --class mypackage.MyClass myJar.jar

但是我遇到了这个错误:
java.lang.ClassCastException: kafka.cluster.Broker cannot be cast to kafka.cluster.BrokerEndPoint
at org.apache.spark.streaming.kafka.KafkaCluster$$anonfun$2$$anonfun$3$$anonfun$apply$6$$anonfun$apply$7.apply(KafkaCluster.scala:90)
at scala.Option.map(Option.scala:145)
at org.apache.spark.streaming.kafka.KafkaCluster$$anonfun$2$$anonfun$3$$anonfun$apply$6.apply(KafkaCluster.scala:90)
at org.apache.spark.streaming.kafka.KafkaCluster$$anonfun$2$$anonfun$3$$anonfun$apply$6.apply(KafkaCluster.scala:87)
at scala.collection.TraversableLike$$anonfun$flatMap$1.apply(TraversableLike.scala:251)
at scala.collection.TraversableLike$$anonfun$flatMap$1.apply(TraversableLike.scala:251)
at scala.collection.IndexedSeqOptimized$class.foreach(IndexedSeqOptimized.scala:33)
at scala.collection.mutable.WrappedArray.foreach(WrappedArray.scala:34)
at scala.collection.TraversableLike$class.flatMap(TraversableLike.scala:251)
at scala.collection.AbstractTraversable.flatMap(Traversable.scala:105)
at org.apache.spark.streaming.kafka.KafkaCluster$$anonfun$2$$anonfun$3.apply(KafkaCluster.scala:87)
at org.apache.spark.streaming.kafka.KafkaCluster$$anonfun$2$$anonfun$3.apply(KafkaCluster.scala:86)
at scala.collection.TraversableLike$$anonfun$flatMap$1.apply(TraversableLike.scala:251)
at scala.collection.TraversableLike$$anonfun$flatMap$1.apply(TraversableLike.scala:251)
at scala.collection.immutable.Set$Set1.foreach(Set.scala:74)
at scala.collection.TraversableLike$class.flatMap(TraversableLike.scala:251)
at scala.collection.AbstractTraversable.flatMap(Traversable.scala:105)
at org.apache.spark.streaming.kafka.KafkaCluster$$anonfun$2.apply(KafkaCluster.scala:86)
at org.apache.spark.streaming.kafka.KafkaCluster$$anonfun$2.apply(KafkaCluster.scala:85)
at scala.util.Either$RightProjection.flatMap(Either.scala:523)
at org.apache.spark.streaming.kafka.KafkaCluster.findLeaders(KafkaCluster.scala:85)
at org.apache.spark.streaming.kafka.KafkaCluster.getLeaderOffsets(KafkaCluster.scala:179)
at org.apache.spark.streaming.kafka.KafkaCluster.getLeaderOffsets(KafkaCluster.scala:161)
at org.apache.spark.streaming.kafka.KafkaCluster.getEarliestLeaderOffsets(KafkaCluster.scala:155)
at org.apache.spark.streaming.kafka.KafkaUtils$$anonfun$5.apply(KafkaUtils.scala:213)
at org.apache.spark.streaming.kafka.KafkaUtils$$anonfun$5.apply(KafkaUtils.scala:211)
at scala.util.Either$RightProjection.flatMap(Either.scala:523)
at org.apache.spark.streaming.kafka.KafkaUtils$.getFromOffsets(KafkaUtils.scala:211)
at org.apache.spark.streaming.kafka.KafkaUtils$.createDirectStream(KafkaUtils.scala:484)
at myPackage.Ingestion$.createStreamingContext(Ingestion.scala:120)
at myPackage.Ingestion$$anonfun$1.apply(Ingestion.scala:55)
at myPackage.Ingestion$$anonfun$1.apply(Ingestion.scala:55)
at scala.Option.getOrElse(Option.scala:120)
at org.apache.spark.streaming.StreamingContext$.getOrCreate(StreamingContext.scala:864)
at myPackage.Ingestion$.main(Ingestion.scala:55)
at myPackage.Ingestion.main(Ingestion.scala)
at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
at sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:57)
at sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
at java.lang.reflect.Method.invoke(Method.java:606)
at org.apache.spark.deploy.yarn.ApplicationMaster$$anon$2.run(ApplicationMaster.scala:542)

浏览互联网时,我认为这是一个版本问题,但我无法弄清楚为什么会发生这种情况,因为在yarn-client和yarn-cluster模式下运行的jars是相同的。

你有任何想法吗?

谢谢, 马可

2个回答

1

看起来Spark Streaming 1.6与Kafka 0.8兼容(请参见文档)。

我猜你在使用Kafka客户端0.9,它会从你的jar文件中以client模式被选择,但是当你切换到cluster模式时,将使用默认的Kafka客户端(0.8.2.1)。

我说得对吗?如果是这样,请尝试从构建中删除kafka客户端依赖项,并使用由spark-streaming-kafka提供的默认客户端(0.8客户端应该可以与0.9代理一起工作)。


亲爱的Arkadiusz,我们找到了问题所在。原因是我们安装了Splice Machine,它需要在YARN附加jar配置中设置其jar文件(其中也包括他们的spark-assembly)。现在我们正在与他们交流,以了解如何解决这个问题。我试图删除这个问题,但不幸的是,由于赏金的缘故,我无法这样做。 - mgaido

0
对于那些可能遇到相同问题的人,我们的问题是由于Splice Machine安装引起的。实际上,Splice Machine需要在YARN附加jar配置中设置它的jar(其中包括他们的spark-assembly)。
现在我们正在尝试找出一种方法,在不从集群中卸载Splice Machine的情况下使所有事情都运行起来。
谢谢。

有一种使用Splice Machine的不那么显眼的方法来完成这个任务。这只是没有被完全追求的事情。在直接的路径中,只有Splice Machine工作的文档规定必须将splice和hbase lib目录添加到YARN应用程序类路径(在CM中)。这对于Splice Machine来说很棒,但可能会对在YARN中运行的其他应用程序造成问题-正如本帖所证明的那样。解决方案很可能是删除额外的YARN应用程序类路径条目,并将这些类路径设置推到Splice Machine配置的其他位置。 - Aaron

网页内容由stack overflow 提供, 点击上面的
可以查看英文原文,
原文链接