使用SASL_SSL身份验证的Kafka Spark结构化流

3
我一直在尝试使用Spark Structured Streaming API连接到启用SASL_SSL的Kafka集群。我已经将jaas.conf文件传递给执行器。看起来我无法设置密钥库和信任库身份验证的值。我尝试按照这个spark link中提到的方式传递值。同时,也尝试通过代码传递,如此link。仍然没有成功。以下是日志。
20/02/28 10:00:53 INFO streaming.StreamExecution: Starting [id = e176f5e7-7157-4df5-93ce-1e267bae6125, runId = 03225a69-ec00-45d9-8092-1467da34980f]. Use flight/checkpoint to store the query checkpoint.
20/02/28 10:00:53 INFO yarn.ApplicationMaster: Final app status: SUCCEEDED, exitCode: 0
20/02/28 10:00:53 INFO spark.SparkContext: Invoking stop() from shutdown hook
20/02/28 10:00:53 INFO server.AbstractConnector: Stopped Spark@46202f7b{HTTP/1.1,[http/1.1]}{0.0.0.0:0}
20/02/28 10:00:53 INFO consumer.ConsumerConfig: ConsumerConfig values:
        metric.reporters = []
        metadata.max.age.ms = 300000
        partition.assignment.strategy = [org.apache.kafka.clients.consumer.RangeAssignor]
        reconnect.backoff.ms = 50
        sasl.kerberos.ticket.renew.window.factor = 0.8
        max.partition.fetch.bytes = 1048576
        bootstrap.servers = [broker1:9093, broker2:9093]
        ssl.keystore.type = JKS
        enable.auto.commit = false
        sasl.mechanism = GSSAPI
        interceptor.classes = null
        exclude.internal.topics = true
        ssl.truststore.password = null
        client.id =
        ssl.endpoint.identification.algorithm = null
        max.poll.records = 1
        check.crcs = true
        request.timeout.ms = 40000
        heartbeat.interval.ms = 3000
        auto.commit.interval.ms = 5000
        receive.buffer.bytes = 65536
        ssl.truststore.type = JKS
        ssl.truststore.location = null
        ssl.keystore.password = null
        fetch.min.bytes = 1
        send.buffer.bytes = 131072
        value.deserializer = class org.apache.kafka.common.serialization.ByteArrayDeserializer
        group.id = spark-kafka-source-93d170e9-977c-40fc-9e5d-790d253fcff5-409016337-driver-0
        retry.backoff.ms = 100
        ssl.secure.random.implementation = null
        sasl.kerberos.kinit.cmd = /usr/bin/kinit
        sasl.kerberos.service.name = null
        sasl.kerberos.ticket.renew.jitter = 0.05
        ssl.trustmanager.algorithm = PKIX
        ssl.key.password = null
        fetch.max.wait.ms = 500
        sasl.kerberos.min.time.before.relogin = 60000
        connections.max.idle.ms = 540000
        session.timeout.ms = 30000
        metrics.num.samples = 2
        key.deserializer = class org.apache.kafka.common.serialization.ByteArrayDeserializer
        ssl.protocol = TLS
        ssl.provider = null
        ssl.enabled.protocols = [TLSv1.2, TLSv1.1, TLSv1]
        ssl.keystore.location = null
        ssl.cipher.suites = null
        security.protocol = SASL_SSL
        ssl.keymanager.algorithm = SunX509
        metrics.sample.window.ms = 30000
        auto.offset.reset = earliest

20/02/28 10:00:53 INFO ui.SparkUI: Stopped Spark web UI at http://<Server>:41037
20/02/28 10:00:53 ERROR streaming.StreamExecution: Query [id = e176f5e7-7157-4df5-93ce-1e267bae6125, runId = 03225a69-ec00-45d9-8092-1467da34980f] terminated with error
org.apache.kafka.common.KafkaException: Failed to construct kafka consumer
        at org.apache.kafka.clients.consumer.KafkaConsumer.<init>(KafkaConsumer.java:702)
        at org.apache.kafka.clients.consumer.KafkaConsumer.<init>(KafkaConsumer.java:557)
        at org.apache.kafka.clients.consumer.KafkaConsumer.<init>(KafkaConsumer.java:540)
        at org.apache.spark.sql.kafka010.SubscribeStrategy.createConsumer(ConsumerStrategy.scala:62)
        at org.apache.spark.sql.kafka010.KafkaOffsetReader.createConsumer(KafkaOffsetReader.scala:297)
        at org.apache.spark.sql.kafka010.KafkaOffsetReader.<init>(KafkaOffsetReader.scala:78)
        at org.apache.spark.sql.kafka010.KafkaSourceProvider.createSource(KafkaSourceProvider.scala:88)
        at org.apache.spark.sql.execution.datasources.DataSource.createSource(DataSource.scala:243)
        at org.apache.spark.sql.execution.streaming.StreamExecution$$anonfun$2$$anonfun$applyOrElse$1.apply(StreamExecution.scala:158)
        at org.apache.spark.sql.execution.streaming.StreamExecution$$anonfun$2$$anonfun$applyOrElse$1.apply(StreamExecution.scala:155)
        at scala.collection.mutable.MapLike$class.getOrElseUpdate(MapLike.scala:194)
        at scala.collection.mutable.AbstractMap.getOrElseUpdate(Map.scala:80)
        at org.apache.spark.sql.execution.streaming.StreamExecution$$anonfun$2.applyOrElse(StreamExecution.scala:155)
        at org.apache.spark.sql.execution.streaming.StreamExecution$$anonfun$2.applyOrElse(StreamExecution.scala:153)
        at org.apache.spark.sql.catalyst.trees.TreeNode$$anonfun$2.apply(TreeNode.scala:267)
        at org.apache.spark.sql.catalyst.trees.TreeNode$$anonfun$2.apply(TreeNode.scala:267)
        at org.apache.spark.sql.catalyst.trees.CurrentOrigin$.withOrigin(TreeNode.scala:70)
        at org.apache.spark.sql.catalyst.trees.TreeNode.transformDown(TreeNode.scala:266)
        at org.apache.spark.sql.catalyst.trees.TreeNode.transform(TreeNode.scala:256)
        at org.apache.spark.sql.execution.streaming.StreamExecution.logicalPlan$lzycompute(StreamExecution.scala:153)
        at org.apache.spark.sql.execution.streaming.StreamExecution.logicalPlan(StreamExecution.scala:147)
        at org.apache.spark.sql.execution.streaming.StreamExecution.org$apache$spark$sql$execution$streaming$StreamExecution$$runBatches(StreamExecution.scala:276)
        at org.apache.spark.sql.execution.streaming.StreamExecution$$anon$1.run(StreamExecution.scala:206)
Caused by: org.apache.kafka.common.KafkaException: javax.security.auth.login.LoginException: Could not login: the client is being asked for a password, but the Kafka client code does not currently support obtaining a password from the user. not available to garner  authentication information from the user
        at org.apache.kafka.common.network.SaslChannelBuilder.configure(SaslChannelBuilder.java:86)
        at org.apache.kafka.common.network.ChannelBuilders.create(ChannelBuilders.java:70)
        at org.apache.kafka.clients.ClientUtils.createChannelBuilder(ClientUtils.java:83)
        at org.apache.kafka.clients.consumer.KafkaConsumer.<init>(KafkaConsumer.java:623)
        ... 22 more
Caused by: javax.security.auth.login.LoginException: Could not login: the client is being asked for a password, but the Kafka client code does not currently support obtaining a password from the user. not available to garner  authentication information from the user
        at com.sun.security.auth.module.Krb5LoginModule.promptForPass(Unknown Source)
        at com.sun.security.auth.module.Krb5LoginModule.attemptAuthentication(Unknown Source)
        at com.sun.security.auth.module.Krb5LoginModule.login(Unknown Source)
        at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
        at sun.reflect.NativeMethodAccessorImpl.invoke(Unknown Source)
        at sun.reflect.DelegatingMethodAccessorImpl.invoke(Unknown Source)
        at java.lang.reflect.Method.invoke(Unknown Source)
        at javax.security.auth.login.LoginContext.invoke(Unknown Source)
        at javax.security.auth.login.LoginContext.access$000(Unknown Source)
        at javax.security.auth.login.LoginContext$4.run(Unknown Source)
        at javax.security.auth.login.LoginContext$4.run(Unknown Source)
        at java.security.AccessController.doPrivileged(Native Method)
        at javax.security.auth.login.LoginContext.invokePriv(Unknown Source)
        at javax.security.auth.login.LoginContext.login(Unknown Source)
        at org.apache.kafka.common.security.authenticator.AbstractLogin.login(AbstractLogin.java:69)
        at org.apache.kafka.common.security.kerberos.KerberosLogin.login(KerberosLogin.java:110)
        at org.apache.kafka.common.security.authenticator.LoginManager.<init>(LoginManager.java:46)
        at org.apache.kafka.common.security.authenticator.LoginManager.acquireLoginManager(LoginManager.java:68)
        at org.apache.kafka.common.network.SaslChannelBuilder.configure(SaslChannelBuilder.java:78)
        ... 25 more
20/02/28 10:00:53 INFO cluster.YarnClusterSchedulerBackend: Shutting down all executors
20/02/28 10:00:53 INFO cluster.YarnSchedulerBackend$YarnDriverEndpoint: Asking each executor to shut down
20/02/28 10:00:53 INFO cluster.SchedulerExtensionServices: Stopping SchedulerExtensionServices
(serviceOption=None,
 services=List(),
 started=false)
20/02/28 10:00:53 INFO spark.MapOutputTrackerMasterEndpoint: MapOutputTrackerMasterEndpoint stopped!
20/02/28 10:00:53 INFO memory.MemoryStore: MemoryStore cleared
20/02/28 10:00:53 INFO storage.BlockManager: BlockManager stopped
20/02/28 10:00:53 INFO storage.BlockManagerMaster: BlockManagerMaster stopped
20/02/28 10:00:53 INFO scheduler.OutputCommitCoordinator$OutputCommitCoordinatorEndpoint: OutputCommitCoordinator stopped!
20/02/28 10:00:53 INFO spark.SparkContext: Successfully stopped SparkContext
20/02/28 10:00:53 INFO yarn.ApplicationMaster: Unregistering ApplicationMaster with SUCCEEDED
20/02/28 10:00:53 INFO impl.AMRMClientImpl: Waiting for application to be successfully unregistered.
20/02/28 10:00:53 INFO yarn.ApplicationMaster: Deleting staging directory hdfs://nameservice1/user/hasif.subair/.sparkStaging/application_1582866369627_0029
20/02/28 10:00:53 INFO util.ShutdownHookManager: Shutdown hook called
20/02/28 10:00:53 INFO util.ShutdownHookManager: Deleting directory /yarn/nm/usercache/hasif.subair/appcache/application_1582866369627_0029/spark-5addfec0-a99f-49e1-b9d1-671c331efb40

代码

    val rawData = spark.readStream.format("kafka")
      .option("kafka.bootstrap.servers", "broker1:9093, broker2:9093")
      .option("subscribe", "hasif_test")
      .option("spark.executor.extraJavaOptions", "-Djava.security.auth.login.config=jaas.conf")
      .option("kafka.security.protocol", "SASL_SSL")
      .option("ssl.truststore.location", "/etc/connect_ts/truststore.jks")
      .option("ssl.truststore.password", "<PASSWORD>")
      .option("ssl.keystore.location", "/etc/connect_ts/keystore.jks")
      .option("ssl.keystore.password", "<PASSWORD>")
      .option("ssl.key.password", "<PASSWORD>")
      .load()
    rawData.writeStream.option("path", "flight/output")
      .option("checkpointLocation", "flight/checkpoint").format("csv").start()

提交Spark作业

spark2-submit --master yarn --deploy-mode cluster \
--conf spark.yarn.keytab=hasif.subair.keytab \
--conf spark.yarn.principal=hasif.subair@TEST.ABC \ 
--files /home/hasif.subair/jaas.conf \
--conf "spark.executor.extraJavaOptions=-Djava.security.auth.login.config=./jaas.conf" \
--conf "spark.driver.extraJavaOptions=-Djava.security.auth.login.config=./jaas.conf" \
--conf "spark.kafka.clusters.hasif.ssl.truststore.location=/etc/ts/truststore.jks" \
--conf "spark.kafka.clusters.hasif.ssl.truststore.password=testcluster" \
--conf "spark.kafka.clusters.hasif.ssl.keystore.location=/etc/ts/keystore.jks" \
--conf "spark.kafka.clusters.hasif.ssl.keystore.password=testcluster" \
--conf "spark.kafka.clusters.hasif.ssl.key.password=testcluster" \
--jars spark-sql-kafka-0-10_2.11-2.2.0.jar \
--class TestApp test_app_2.11-0.1.jar \

jaas.conf

KafkaClient {
 com.sun.security.auth.module.Krb5LoginModule required
 useTicketCache=true
 principal="hasif.subair@TEST.ABC"
 useKeyTab=true
 serviceName="kafka"
 keyTab="hasif.subair.keytab"
 client=true;
};

任何帮助都将不胜感激。

你运行了 bin/kafka-acls.sh --add --allow-principals user:myuser --operation ALL --topic myTopic --authorizer-properties zookeeper.connect=zookeeperHost:2181 吗? - Giorgos Myrianthous
@Hasifsubair:你在这里使用的是哪个版本的Spark? - SrinR
2个回答

7
卡夫卡的配置可以通过在DataStreamReader.option中加上kafka.前缀来进行设置,例如:
val clusterName = "hasif"
stream.option(s"spark.kafka.clusters.${clusterName}.kafka.ssl.keystore.location", "/etc/connect_ts/keystore.jks")

请使用kafka.ssl.truststore.location代替ssl.truststore.location。 类似地,您可以为其他属性添加前缀kafka并尝试。


1

我怀疑 SSL 的值没有被捕获。正如您在日志中可以看到的那样,这些值显示为 null。

ssl.truststore.location = null
ssl.truststore.password = null
ssl.keystore.password = null
ssl.keystore.location = null

如果值被正确设置,它将会反映为:
ssl.truststore.location = /etc/connect_ts/truststore.jks
ssl.truststore.password = [hidden]
ssl.keystore.password = [hidden]
ssl.keystore.location = /etc/connect_ts/keystore.jks

将--files参数中的文件添加进去,例如kile --files /etc/connect_ts/keystore.jks, /etc/connect_ts/truststore.jks,并在选项中只给出文件名。这些文件没有正确地在代码中填充。 - Raptor0009
1
我遇到了同样的问题,但这并没有解决问题(我使用了相同的实现)。还需要做些什么吗? - Julia Bel
6
这句话指出了问题,但没有解释解决方案。 - OneCricketeer

网页内容由stack overflow 提供, 点击上面的
可以查看英文原文,
原文链接