创建Kafka流时出现了AbstractMethodError错误

18

我正在尝试使用createDirectStream方法打开Kafka(尝试版本0.11.0.2和1.0.1)的流,并收到此AbstractMethodError错误:

Translated:

我正在尝试使用createDirectStream方法打开Kafka流(尝试版本0.11.0.2和1.0.1),却遇到了AbstractMethodError错误:

Exception in thread "main" java.lang.AbstractMethodError
    at org.apache.spark.internal.Logging$class.initializeLogIfNecessary(Logging.scala:99)
    at org.apache.spark.streaming.kafka010.KafkaUtils$.initializeLogIfNecessary(KafkaUtils.scala:39)
    at org.apache.spark.internal.Logging$class.log(Logging.scala:46)
    at org.apache.spark.streaming.kafka010.KafkaUtils$.log(KafkaUtils.scala:39)
    at org.apache.spark.internal.Logging$class.logWarning(Logging.scala:66)
    at org.apache.spark.streaming.kafka010.KafkaUtils$.logWarning(KafkaUtils.scala:39)
    at org.apache.spark.streaming.kafka010.KafkaUtils$.fixKafkaParams(KafkaUtils.scala:201)
    at org.apache.spark.streaming.kafka010.DirectKafkaInputDStream.<init>(DirectKafkaInputDStream.scala:63)
    at org.apache.spark.streaming.kafka010.KafkaUtils$.createDirectStream(KafkaUtils.scala:147)
    at org.apache.spark.streaming.kafka010.KafkaUtils$.createDirectStream(KafkaUtils.scala:124)

这是我所称呼的方式:

val preferredHosts = LocationStrategies.PreferConsistent
    val kafkaParams = Map(
      "bootstrap.servers" -> "localhost:9092",
      "key.deserializer" -> classOf[IntegerDeserializer],
      "value.deserializer" -> classOf[StringDeserializer],
      "group.id" -> groupId,
      "auto.offset.reset" -> "earliest"
    )

    val aCreatedStream = createDirectStream[String, String](ssc, preferredHosts,
      ConsumerStrategies.Subscribe[String, String](topics, kafkaParams))

我在9092上运行Kafka,并且能够创建生产者和消费者并在它们之间传递消息,所以不确定为什么Scala代码无法正常工作。欢迎任何想法。


可能是解决Apache Spark依赖问题的重复问题。 - zero323
3个回答

21

原来我在使用Spark 2.3,但应该使用Spark 2.2。显然,这个方法在较新版本中被改为抽象方法,所以我一直得到这个错误。


我曾经遇到过类似问题,使用了不匹配的Spark-core 2.3.0和Spark-hive 2.1.0版本。必须确保它们的版本匹配。 - Uncle Long Hair
2
这里有一个很好的讨论,关于如何解决这个问题:https://community.hortonworks.com/articles/197922/spark-23-structured-streaming-integration-with-apa.html - Concrete Gannet

6

我也遇到了相同的异常,我的情况是我创建了一个应用程序jar包,并将其依赖于版本为2.1.0spark-streaming-kafka-0-10_2.11,而在尝试部署到Spark 2.3.0集群时出现了问题。


我在集群中运行spark submit时遇到了相同的问题。在我的情况下,开发环境是2.2.0.2.6.4.0-91,QA环境是2.3.1.3.0.1.0-187。你是如何解决这个问题的?能否请你解释一下。你使用的spark-streaming-kafka版本是哪个? - dileepVikram
1
我认为我把依赖版本改成了“spark-streaming-kafka-0-10_2.11”的2.3.0版本。 - Oliv

1

我收到了相同的错误。我将我的依赖项设置为与我的Spark解释器版本相同。

%spark2.dep
z.reset()
z.addRepo("MavenCentral").url("https://mvnrepository.com/")

z.load("org.apache.spark:spark-streaming-kafka-0-10_2.11:2.3.0")
z.load("org.apache.kafka:kafka-clients:2.3.0")

网页内容由stack overflow 提供, 点击上面的
可以查看英文原文,
原文链接