Kafka - SSL握手失败。

4
我已经在本地的Kafka实例上设置了SSL。当我在SSL端口上启动Kafka控制台生产者/消费者时,它会出现 SSL 握手错误。
Karans-MacBook-Pro:keystore karanalang$ $CONFLUENT_HOME/bin/kafka-console-producer --broker-list localhost:9093 --topic karantest --producer.config $CONFLUENT_HOME/props/client-ssl.properties
>[2021-11-10 13:15:09,824] ERROR [Producer clientId=console-producer] Connection to node -1 (localhost/127.0.0.1:9093) failed authentication due to: SSL handshake failed (org.apache.kafka.clients.NetworkClient)
[2021-11-10 13:15:09,826] WARN [Producer clientId=console-producer] Bootstrap broker localhost:9093 (id: -1 rack: null) disconnected (org.apache.kafka.clients.NetworkClient)
[2021-11-10 13:15:10,018] ERROR [Producer clientId=console-producer] Connection to node -1 (localhost/127.0.0.1:9093) failed authentication due to: SSL handshake failed (org.apache.kafka.clients.NetworkClient)
[2021-11-10 13:15:10,019] WARN [Producer clientId=console-producer] Bootstrap broker localhost:9093 (id: -1 rack: null) disconnected (org.apache.kafka.clients.NetworkClient)
[2021-11-10 13:15:10,195] ERROR [Producer clientId=console-producer] Connection to node -1 (localhost/127.0.0.1:9093) failed authentication due to: SSL handshake failed (org.apache.kafka.clients.NetworkClient)

以下是所做的更改:

  1. 创建信任库和密钥库

以下是用openssl命令检查SSL连接的输出:

Karans-MacBook-Pro:keystore karanalang$ openssl s_client -debug -connect localhost:9093 -tls1
CONNECTED(00000005)
write to 0x13d7bdf90 [0x13e01ea03] (118 bytes => 118 (0x76))
0000 - 16 03 01 00 71 01 00 00-6d 03 01 81 e8 00 cd c4   ....q...m.......
0010 - 04 4b 64 86 3e 30 97 32-c3 66 3a 8c ed 05 bf 97   .Kd.>0.2.f:.....
0020 - ff d5 b2 a4 26 fe 99 c0-7f 94 a1 00 00 2e c0 14   ....&...........
0030 - c0 0a 00 39 ff 85 00 88-00 81 00 35 00 84 c0 13   ...9.......5....
---
0076 - <SPACES/NULS>
read from 0x13d7bdf90 [0x13e01a803] (5 bytes => 5 (0x5))
0005 - <SPACES/NULS>
4307385836:error:1400410B:SSL routines:CONNECT_CR_SRVR_HELLO:wrong version number:/System/Volumes/Data/SWE/macOS/BuildRoots/e90674e518/Library/Caches/com.apple.xbs/Sources/libressl/libressl-56.60.2/libressl-2.8/ssl/ssl_pkt.c:386:
---
no peer certificate available
---
No client certificate CA names sent
---
SSL handshake has read 5 bytes and written 0 bytes
---
New, (NONE), Cipher is (NONE)
Secure Renegotiation IS NOT supported
Compression: NONE
Expansion: NONE
No ALPN negotiated
SSL-Session:
    Protocol  : TLSv1
    Cipher    : 0000
    Session-ID: 
    Session-ID-ctx: 
    Master-Key: 
    Start Time: 1636579015
    Timeout   : 7200 (sec)
    Verify return code: 0 (ok)
---

这里是server.properties文件:

# Licensed to the Apache Software Foundation (ASF) under one or more
# contributor license agreements.  See the NOTICE file distributed with
# this work for additional information regarding copyright ownership.
# The ASF licenses this file to You under the Apache License, Version 2.0
# (the "License"); you may not use this file except in compliance with
# the License.  You may obtain a copy of the License at
#
#    http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.

# see kafka.server.KafkaConfig for additional details and defaults

############################# Server Basics #############################

# The id of the broker. This must be set to a unique integer for each broker.
broker.id=0

############################# Socket Server Settings #############################

# The address the socket server listens on. It will get the value returned from 
# java.net.InetAddress.getCanonicalHostName() if not configured.
#   FORMAT:
#     listeners = listener_name://host_name:port
#   EXAMPLE:
#     listeners = PLAINTEXT://your.host.name:9092
# SSL CHANGE
listeners=PLAINTEXT://localhost:9092,SSL://localhost:9093

# Hostname and port the broker will advertise to producers and consumers. If not set, 
# it uses the value for "listeners" if configured.  Otherwise, it will use the value
# returned from java.net.InetAddress.getCanonicalHostName().
# SSL CHANGE
advertised.listeners=PLAINTEXT://localhost:9092,SSL://localhost:9093
ssl.client.auth=none

# Maps listener names to security protocols, the default is for them to be the same. See the config documentation for more details
listener.security.protocol.map=PLAINTEXT:PLAINTEXT,SSL:SSL,SASL_PLAINTEXT:SASL_PLAINTEXT,SASL_SSL:SASL_SSL

# The number of threads that the server uses for receiving requests from the network and sending responses to the network
num.network.threads=3

# The number of threads that the server uses for processing requests, which may include disk I/O
num.io.threads=8

# The send buffer (SO_SNDBUF) used by the socket server
socket.send.buffer.bytes=102400

# The receive buffer (SO_RCVBUF) used by the socket server
socket.receive.buffer.bytes=102400

# The maximum size of a request that the socket server will accept (protection against OOM)
socket.request.max.bytes=104857600


############################# Log Basics #############################

# A comma separated list of directories under which to store log files
log.dirs=/tmp/kafka-logs

# The default number of log partitions per topic. More partitions allow greater
# parallelism for consumption, but this will also result in more files across
# the brokers.
num.partitions=1

# The number of threads per data directory to be used for log recovery at startup and flushing at shutdown.
# This value is recommended to be increased for installations with data dirs located in RAID array.
num.recovery.threads.per.data.dir=1

############################# Internal Topic Settings  #############################
# The replication factor for the group metadata internal topics "__consumer_offsets" and "__transaction_state"
# For anything other than development testing, a value greater than 1 is recommended to ensure availability such as 3.
offsets.topic.replication.factor=1
transaction.state.log.replication.factor=1
transaction.state.log.min.isr=1

############################# Log Flush Policy #############################

# Messages are immediately written to the filesystem but by default we only fsync() to sync
# the OS cache lazily. The following configurations control the flush of data to disk.
# There are a few important trade-offs here:
#    1. Durability: Unflushed data may be lost if you are not using replication.
#    2. Latency: Very large flush intervals may lead to latency spikes when the flush does occur as there will be a lot of data to flush.
#    3. Throughput: The flush is generally the most expensive operation, and a small flush interval may lead to excessive seeks.
# The settings below allow one to configure the flush policy to flush data after a period of time or
# every N messages (or both). This can be done globally and overridden on a per-topic basis.

# The number of messages to accept before forcing a flush of data to disk
#log.flush.interval.messages=10000

# The maximum amount of time a message can sit in a log before we force a flush
#log.flush.interval.ms=1000

############################# Log Retention Policy #############################

# The following configurations control the disposal of log segments. The policy can
# be set to delete segments after a period of time, or after a given size has accumulated.
# A segment will be deleted whenever *either* of these criteria are met. Deletion always happens
# from the end of the log.

# The minimum age of a log file to be eligible for deletion due to age
log.retention.hours=168

# A size-based retention policy for logs. Segments are pruned from the log unless the remaining
# segments drop below log.retention.bytes. Functions independently of log.retention.hours.
#log.retention.bytes=1073741824

# The maximum size of a log segment file. When this size is reached a new log segment will be created.
log.segment.bytes=1073741824

# The interval at which log segments are checked to see if they can be deleted according
# to the retention policies
log.retention.check.interval.ms=300000

############################# Zookeeper #############################

# Zookeeper connection string (see zookeeper docs for details).
# This is a comma separated host:port pairs, each corresponding to a zk
# server. e.g. "127.0.0.1:3000,127.0.0.1:3001,127.0.0.1:3002".
# You can also append an optional chroot string to the urls to specify the
# root directory for all kafka znodes.
zookeeper.connect=localhost:2181

# Timeout in ms for connecting to zookeeper
zookeeper.connection.timeout.ms=18000

##################### Confluent Metrics Reporter #######################
# Confluent Control Center and Confluent Auto Data Balancer integration
#
# Uncomment the following lines to publish monitoring data for
# Confluent Control Center and Confluent Auto Data Balancer
# If you are using a dedicated metrics cluster, also adjust the settings
# to point to your metrics kakfa cluster.
#metric.reporters=io.confluent.metrics.reporter.ConfluentMetricsReporter
#confluent.metrics.reporter.bootstrap.servers=localhost:9092
#
# Uncomment the following line if the metrics cluster has a single broker
#confluent.metrics.reporter.topic.replicas=1

############################# Group Coordinator Settings #############################

# The following configuration specifies the time, in milliseconds, that the GroupCoordinator will delay the initial consumer rebalance.
# The rebalance will be further delayed by the value of group.initial.rebalance.delay.ms as new members join the group, up to a maximum of max.poll.interval.ms.
# The default value for this is 3 seconds.
# We override this to 0 here as it makes for a better out-of-the-box experience for development and testing.
# However, in production environments the default value of 3 seconds is more suitable as this will help to avoid unnecessary, and potentially expensive, rebalances during application startup.
group.initial.rebalance.delay.ms=0


############################# Confluent Authorizer Settings  #############################

# Uncomment to enable Confluent Authorizer with support for ACLs, LDAP groups and RBAC
#authorizer.class.name=io.confluent.kafka.security.authorizer.ConfluentServerAuthorizer
# Semi-colon separated list of super users in the format <principalType>:<principalName>
#super.users=
# Specify a valid Confluent license. By default free-tier license will be used
#confluent.license=
# Replication factor for the topic used for licensing. Default is 3.
confluent.license.topic.replication.factor=1

# Uncomment the following lines and specify values where required to enable CONFLUENT provider for RBAC and centralized ACLs
# Enable CONFLUENT provider 
#confluent.authorizer.access.rule.providers=ZK_ACL,CONFLUENT
# Bootstrap servers for RBAC metadata. Must be provided if this broker is not in the metadata cluster
#confluent.metadata.bootstrap.servers=PLAINTEXT://127.0.0.1:9092
# Replication factor for the metadata topic used for authorization. Default is 3.
confluent.metadata.topic.replication.factor=1

# Replication factor for the topic used for audit logs. Default is 3.
confluent.security.event.logger.exporter.kafka.topic.replicas=1

# Listeners for metadata server
#confluent.metadata.server.listeners=http://0.0.0.0:8090
# Advertised listeners for metadata server
#confluent.metadata.server.advertised.listeners=http://127.0.0.1:8090

############################# Confluent Data Balancer Settings  #############################

# The Confluent Data Balancer is used to measure the load across the Kafka cluster and move data
# around as necessary. Comment out this line to disable the Data Balancer.
confluent.balancer.enable=true

# By default, the Data Balancer will only move data when an empty broker (one with no partitions on it)
# is added to the cluster or a broker failure is detected. Comment out this line to allow the Data
# Balancer to balance load across the cluster whenever an imbalance is detected.
#confluent.balancer.heal.uneven.load.trigger=ANY_UNEVEN_LOAD

# The default time to declare a broker permanently failed is 1 hour (3600000 ms).
# Uncomment this line to turn off broker failure detection, or adjust the threshold
# to change the duration before a broker is declared failed.
#confluent.balancer.heal.broker.failure.threshold.ms=-1

# Edit and uncomment the following line to limit the network bandwidth used by data balancing operations.
# This value is in bytes/sec/broker. The default is 10MB/sec.
#confluent.balancer.throttle.bytes.per.second=10485760

# Capacity Limits -- when set to positive values, the Data Balancer will attempt to keep
# resource usage per-broker below these limits.
# Edit and uncomment this line to limit the maximum number of replicas per broker. Default is unlimited.
#confluent.balancer.max.replicas=10000

# Edit and uncomment this line to limit what fraction of the log disk (0-1.0) is used before rebalancing.
# The default (below) is 85% of the log disk.
#confluent.balancer.disk.max.load=0.85

# Edit and uncomment these lines to define a maximum network capacity per broker, in bytes per
# second. The Data Balancer will attempt to ensure that brokers are using less than this amount
# of network bandwidth when rebalancing.
# Here, 10MB/s. The default is unlimited capacity.
#confluent.balancer.network.in.max.bytes.per.second=10485760
#confluent.balancer.network.out.max.bytes.per.second=10485760

# Edit and uncomment this line to identify specific topics that should not be moved by the data balancer.
# Removal operations always move topics regardless of this setting.
#confluent.balancer.exclude.topic.names=

# Edit and uncomment this line to identify topic prefixes that should not be moved by the data balancer.
# (For example, a "confluent.balancer" prefix will match all of "confluent.balancer.a", "confluent.balancer.b",
# "confluent.balancer.c", and so on.)
# Removal operations always move topics regardless of this setting.
#confluent.balancer.exclude.topic.prefixes=

# The replication factor for the topics the Data Balancer uses to store internal state.
# For anything other than development testing, a value greater than 1 is recommended to ensure availability.
# The default value is 3.
confluent.balancer.topic.replication.factor=1

################################## Confluent Telemetry Settings  ##################################

# To start using Telemetry, first generate a Confluent Cloud API key/secret. This can be done with
# instructions at https://docs.confluent.io/current/cloud/using/api-keys.html. Note that you should
# be using the '--resource cloud' flag.
#
# After generating an API key/secret, to enable Telemetry uncomment the lines below and paste
# in your API key/secret.
#
#confluent.telemetry.enabled=true
#confluent.telemetry.api.key=<CLOUD_API_KEY>
#confluent.telemetry.api.secret=<CCLOUD_API_SECRET>

############ SSL #################

ssl.truststore.location=/Users/karanalang/Documents/Technology/confluent-6.2.1/ssl_certs/truststore/kafka.truststore.jks
ssl.truststore.password=test123
ssl.keystore.location=/Users/karanalang/Documents/Technology/confluent-6.2.1/ssl_certs/keystore/kafka.keystore.jks
ssl.keystore.password=test123
ssl.key.password=test123

# confluent.metrics.reporter.bootstrap.servers=localhost:9093
# confluent.metrics.reporter.security.protocol=SSL
# confluent.metrics.reporter.ssl.truststore.location=/Users/karanalang/Documents/Technology/confluent-6.2.1/ssl_certs/truststore/kafka.truststore.jks
# confluent.metrics.reporter.ssl.truststore.password=test123
# confluent.metrics.reporter.ssl.keystore.location=/Users/karanalang/Documents/Technology/confluent-6.2.1/ssl_certs/keystore/kafka.keystore.jks
# confluent.metrics.reporter.ssl.keystore.password=test123
# confluent.metrics.reporter.ssl.key.password=test123

client-ssl.properties:

bootstrap.servers=localhost:9093
security.protocol=SSL
ssl.truststore.location=/Users/karanalang/Documents/Technology/confluent-6.2.1/ssl_certs/truststore/kafka.truststore.jks
ssl.truststore.password=test123
ssl.keystore.location=/Users/karanalang/Documents/Technology/confluent-6.2.1/ssl_certs/keystore/kafka.keystore.jks
ssl.keystore.password=test123
ssl.key.password=test123

启动控制台生产者/消费者的命令:

$CONFLUENT_HOME/bin/kafka-console-producer --broker-list localhost:9093 --topic karantest --producer.config $CONFLUENT_HOME/props/client-ssl.properties
$CONFLUENT_HOME/bin/kafka-console-consumer --bootstrap-server localhost:9093 --topic karantest --consumer.config $CONFLUENT_HOME/props/client-ssl.properties --from-beginning

有没有解决这个问题的想法?
更新: 当我尝试调试(使用- export KAFKA_OPTS=-Djavax.net.debug=all)时,出现了这个错误。
javax.net.ssl|DEBUG|0E|kafka-producer-network-thread | console-producer|2021-11-10 14:04:26.107 PST|SSLExtensions.java:173|Ignore unavailable extension: status_request
javax.net.ssl|DEBUG|0E|kafka-producer-network-thread | console-producer|2021-11-10 14:04:26.107 PST|SSLExtensions.java:173|Ignore unavailable extension: status_request
javax.net.ssl|ERROR|0E|kafka-producer-network-thread | console-producer|2021-11-10 14:04:26.108 PST|TransportContext.java:341|Fatal (CERTIFICATE_UNKNOWN): No name matching localhost found (
"throwable" : {
  java.security.cert.CertificateException: No name matching localhost found
    at java.base/sun.security.util.HostnameChecker.matchDNS(HostnameChecker.java:234)
    at java.base/sun.security.util.HostnameChecker.match(HostnameChecker.java:103)
    at java.base/sun.security.ssl.X509TrustManagerImpl.checkIdentity(X509TrustManagerImpl.java:455)
    at java.base/sun.security.ssl.X509TrustManagerImpl.checkIdentity(X509TrustManagerImpl.java:429)
    at java.base/sun.security.ssl.X509TrustManagerImpl.checkTrusted(X509TrustManagerImpl.java:283)
    at java.base/sun.security.ssl.X509TrustManagerImpl.checkServerTrusted(X509TrustManagerImpl.java:141)
    at java.base/sun.security.ssl.CertificateMessage$T13CertificateConsumer.checkServerCerts(CertificateMessage.java:1335)
    at java.base/sun.security.ssl.CertificateMessage$T13CertificateConsumer.onConsumeCertificate(CertificateMessage.java:1232)
    at java.base/sun.security.ssl.CertificateMessage$T13CertificateConsumer.consume(CertificateMessage.java:1175)
    at java.base/sun.security.ssl.SSLHandshake.consume(SSLHandshake.java:392)
    at java.base/sun.security.ssl.HandshakeContext.dispatch(HandshakeContext.java:443)
    at java.base/sun.security.ssl.SSLEngineImpl$DelegatedTask$DelegatedAction.run(SSLEngineImpl.java:1074)
    at java.base/sun.security.ssl.SSLEngineImpl$DelegatedTask$DelegatedAction.run(SSLEngineImpl.java:1061)
    at java.base/java.security.AccessController.doPrivileged(Native Method)
    at java.base/sun.security.ssl.SSLEngineImpl$DelegatedTask.run(SSLEngineImpl.java:1008)
    at org.apache.kafka.common.network.SslTransportLayer.runDelegatedTasks(SslTransportLayer.java:509)
    at org.apache.kafka.common.network.SslTransportLayer.handshakeUnwrap(SslTransportLayer.java:601)
    at org.apache.kafka.common.network.SslTransportLayer.doHandshake(SslTransportLayer.java:447)
    at org.apache.kafka.common.network.SslTransportLayer.handshake(SslTransportLayer.java:332)
    at org.apache.kafka.common.network.KafkaChannel.prepare(KafkaChannel.java:229)
    at org.apache.kafka.common.network.Selector.pollSelectionKeys(Selector.java:563)
    at org.apache.kafka.common.network.Selector.poll(Selector.java:499)
    at org.apache.kafka.clients.NetworkClient.poll(NetworkClient.java:639)
    at org.apache.kafka.clients.producer.internals.Sender.runOnce(Sender.java:327)
    at org.apache.kafka.clients.producer.internals.Sender.run(Sender.java:242)
    at java.base/java.lang.Thread.run(Thread.java:829)}

3个回答

4

1
尝试为生产者和消费者设置身份识别算法。
ssl.endpoint.identification.algorithm=
producer.ssl.endpoint.identification.algorithm=
consumer.ssl.endpoint.identification.algorithm=

1

检查您是否有连接或问题。

您可以使用以下方式测试访问:
openssl s_client -debug -connect servername:port -tls1_2

答案必须是:"Verify return code: 0 (ok)

否则,您将无法访问。


网页内容由stack overflow 提供, 点击上面的
可以查看英文原文,
原文链接