无法找到LoginModule类: org.apache.kafka.common.security.plain.PlainLoginModule

10

环境: Spark 2.3.0, Scala 2.11.12, Kafka (最新版本)

我有一个安全的Kafka系统,我正在尝试将我的Spark Streaming Consumer连接到它。以下是我的build.sbt文件:

name := "kafka-streaming"
version := "1.0"

scalaVersion := "2.11.12"

// still want to be able to run in sbt
// https://github.com/sbt/sbt-assembly#-provided-configuration
run in Compile <<= Defaults.runTask(fullClasspath in Compile, mainClass in (Compile, run), runner in (Compile, run))

fork in run := true
javaOptions in run ++= Seq(
    "-Dlog4j.debug=true",
    "-Dlog4j.configuration=log4j.properties")

assemblyMergeStrategy in assembly := {
    case "META-INF/services/org.apache.spark.sql.sources.DataSourceRegister" => MergeStrategy.concat
    case PathList("META-INF", _*) => MergeStrategy.discard
    case _ => MergeStrategy.first
}

libraryDependencies ++= Seq(
    "org.apache.spark" %% "spark-core" % "2.3.0",
    "org.apache.spark" %% "spark-sql" % "2.3.0",
    "org.apache.spark" %% "spark-streaming" % "2.3.0",
    "org.apache.spark" %% "spark-streaming-kafka-0-10" % "2.3.0",
    "org.apache.spark" %% "spark-sql-kafka-0-10" % "2.3.0",
    "com.ibm.db2.jcc" % "db2jcc" % "db2jcc4"
)

请注意,这是Spark 2.3.0版本,我无法更改我的Spark版本。
现在这是我的代码部分,我尝试将我的Spark Streaming消费者连接到经过安全认证的Kafka:
val df = spark.readStream
            .format("kafka")
            .option("subscribe", "raw_weather")
            .option("kafka.bootstrap.servers", "<url:port>s")
            .option("kafka.security.protocol", "SASL_SSL")
            .option("kafka.sasl.mechanism" , "PLAIN")
            .option("kafka.sasl.jaas.config", "org.apache.kafka.common.security.plain.PlainLoginModule required username=\"user\" password=\"" + "password" + "\";")
            .option("kafka.ssl.protocol", "TLSv1.2")
            .option("kafka.ssl.enabled.protocols", "TLSv1.2")
            .option("kafka.ssl.endpoint.identification.algorithm", "HTTPS")
            .load()

当我试图运行这个程序时,会抛出以下错误:
Exception in thread "main" org.apache.kafka.common.KafkaException: Failed to construct kafka consumer
    at org.apache.kafka.clients.consumer.KafkaConsumer.<init>(KafkaConsumer.java:702)
    at org.apache.kafka.clients.consumer.KafkaConsumer.<init>(KafkaConsumer.java:557)
    at org.apache.kafka.clients.consumer.KafkaConsumer.<init>(KafkaConsumer.java:540)
    at org.apache.spark.sql.kafka010.SubscribeStrategy.createConsumer(ConsumerStrategy.scala:62)
    at org.apache.spark.sql.kafka010.KafkaOffsetReader.createConsumer(KafkaOffsetReader.scala:314)
    at org.apache.spark.sql.kafka010.KafkaOffsetReader.<init>(KafkaOffsetReader.scala:78)
    at org.apache.spark.sql.kafka010.KafkaSourceProvider.createContinuousReader(KafkaSourceProvider.scala:130)
    at org.apache.spark.sql.kafka010.KafkaSourceProvider.createContinuousReader(KafkaSourceProvider.scala:43)
    at org.apache.spark.sql.streaming.DataStreamReader.load(DataStreamReader.scala:185)
    >> at com.ibm.kafkasparkintegration.executables.WeatherDataStream$.getRawDataFrame(WeatherDataStream.scala:74)
    at com.ibm.kafkasparkintegration.executables.WeatherDataStream$.main(WeatherDataStream.scala:24)
    at com.ibm.kafkasparkintegration.executables.WeatherDataStream.main(WeatherDataStream.scala)
Caused by: org.apache.kafka.common.KafkaException: javax.security.auth.login.LoginException: unable to find LoginModule class:  org.apache.kafka.common.security.plain.PlainLoginModule
    at org.apache.kafka.common.network.SaslChannelBuilder.configure(SaslChannelBuilder.java:86)
    at org.apache.kafka.common.network.ChannelBuilders.create(ChannelBuilders.java:70)
    at org.apache.kafka.clients.ClientUtils.createChannelBuilder(ClientUtils.java:83)
    at org.apache.kafka.clients.consumer.KafkaConsumer.<init>(KafkaConsumer.java:623)
    ... 11 more
Caused by: javax.security.auth.login.LoginException: unable to find LoginModule class:  org.apache.kafka.common.security.plain.PlainLoginModule
    at javax.security.auth.login.LoginContext.invoke(LoginContext.java:794)
    at javax.security.auth.login.LoginContext.access$000(LoginContext.java:195)
    at javax.security.auth.login.LoginContext$4.run(LoginContext.java:682)
    at javax.security.auth.login.LoginContext$4.run(LoginContext.java:680)
    at java.security.AccessController.doPrivileged(Native Method)
    at javax.security.auth.login.LoginContext.invokePriv(LoginContext.java:680)
    at javax.security.auth.login.LoginContext.login(LoginContext.java:587)
    at org.apache.kafka.common.security.authenticator.AbstractLogin.login(AbstractLogin.java:69)
    at org.apache.kafka.common.security.authenticator.LoginManager.<init>(LoginManager.java:46)
    at org.apache.kafka.common.security.authenticator.LoginManager.acquireLoginManager(LoginManager.java:68)
    at org.apache.kafka.common.network.SaslChannelBuilder.configure(SaslChannelBuilder.java:78)
    ... 14 more

错误日志中的 >> 指向上述片段中的 load()。我已经尝试了几天,但是没有取得太多成功。
1个回答

1
我遇到了同样的错误,解决方法是在jaas.config中更改类: Confluent文档 + Azure指出:
kafkashaded.org.apache.kafka.common.security.plain.PlainLoginModule

然而,使用Confluent Cloud时正确的是org.apache.kafka...
spark.readStream
  .format("kafka")
  .option("kafka.bootstrap.servers", bootstrapServers.mkString(","))
  .option("kafka.security.protocol", "SASL_SSL")
  .option(
    "kafka.sasl.jaas.config",
    s"""org.apache.kafka.common.security.plain.PlainLoginModule required username="${confluentApiKey: String}" password="${confluentSecret: String}";"""
  )
  .option("kafka.ssl.endpoint.identification.algorithm", "https")
  .option("kafka.sasl.mechanism", "PLAIN")
  .option("subscribe", confluentTopicName)
  .option("startingOffsets", "earliest")
  .option("failOnDataLoss", "false")

网页内容由stack overflow 提供, 点击上面的
可以查看英文原文,
原文链接