使用pem密钥和客户端证书连接Kafka SSL

4

我能够通过以下客户端配置文件中的ssl细节连接到kafka,并使用CLI(bin / kafka-console-consumer.sh)读取数据:

ssl.keystore.location=/test/keystore.jks
ssl.keystore.password=abcd1234
ssl.key.password=abcd1234

Command: bin/kafka-console-consumer.sh --bootstrap-server 'server details'  --topic topic_name --consumer.config client.properties --group group-id

但是我无法使用相同的数据从Python或Spark连接

consumer = KafkaConsumer(topic,bootstrap_servers=bootstrap_server,security_protocol='SSL',sasl_mechanism='PLAIN',ssl_certfile='certificate.pem',ssl_keyfile='pk.key')

我尝试更改上面代码中的多个选项,比如添加check_host_name等,但没有成功。 Kafka不是由我们团队拥有,而是由另一个团队管理,当我们请求访问时,我们会获得私钥和证书以及CA捆绑包和ARN名称。

从Spark(Python)中,我尝试了以下代码:

sdf1 = spark.readStream.format("kafka")
       .option("kafka.bootstrap.servers",bootstrap_server)
       .option("subscribe", topic_name)
       .option("startingOffsets", "latest")
       .option("kafka.security.protocol","SSL")
       .option("kafka.ssl.keystore.location",'keystore.jks')
       .option("kafka.ssl.keystore.password", '****')
       .option("kafka.ssl.key.password",'****')
       .load()

我遇到了这样的错误:"org.apache.kafka.common.errors.GroupAuthorizationException: Not authorized to access group: spark-kafka-source-xxxxxxx-xxxxx-xxxxx"。

上述错误可能与spark每次访问时生成唯一组ID有关。在Spark 3.0及以上版本中,只允许在Spark dataframe中使用组ID。我需要解决此问题的选项来修复Spark 2.4.4中的此问题。

任何建议都将不胜感激。

1个回答

0

您只需要提供用于验证访问主题的主体,而不管使用哪个消费者组。它看起来像这样:

kafka-acls --authorizer-properties zookeeper.connect=zk_ip_or_fqdn:2181  --add  --allow-principal User:"userName" --operation All --topic yourTopicName --group=*

在您的情况下,userName(主体名称)将是SSL证书的主题名称,格式为“CN=toto,OU=titi,...”。


网页内容由stack overflow 提供, 点击上面的
可以查看英文原文,
原文链接