连接到在Docker中运行的Kafka

51

我按照Confluent文档(第2-3步骤)的描述,在本地机器上设置了单节点Kafka Docker容器。

此外,我还暴露了Zookeeper的端口2181和Kafka的端口9092,这样我就可以从运行在本地机器上的客户端连接到它们:

$ docker run -d \
    -p 2181:2181 \
    --net=confluent \
    --name=zookeeper \
    -e ZOOKEEPER_CLIENT_PORT=2181 \
    confluentinc/cp-zookeeper:4.1.0

$ docker run -d \
    --net=confluent \
    --name=kafka \
    -p 9092:9092 \
    -e KAFKA_ZOOKEEPER_CONNECT=zookeeper:2181 \
    -e KAFKA_ADVERTISED_LISTENERS=PLAINTEXT://kafka:9092 \
    -e KAFKA_OFFSETS_TOPIC_REPLICATION_FACTOR=1 \
    confluentinc/cp-kafka:4.1.0

问题:当我尝试从主机连接到Kafka时,连接失败,因为它无法解析地址:kafka:9092

这是我的Java代码:

Properties props = new Properties();
props.put("bootstrap.servers", "localhost:9092");
props.put("client.id", "KafkaExampleProducer");
props.put("key.serializer", LongSerializer.class.getName());
props.put("value.serializer", StringSerializer.class.getName());
KafkaProducer<Long, String> producer = new KafkaProducer<>(props);
ProducerRecord<Long, String> record = new ProducerRecord<>("foo", 1L, "Test 1");
producer.send(record).get();
producer.flush();

异常情况:

java.io.IOException: Can't resolve address: kafka:9092
    at org.apache.kafka.common.network.Selector.doConnect(Selector.java:235) ~[kafka-clients-2.0.0.jar:na]
    at org.apache.kafka.common.network.Selector.connect(Selector.java:214) ~[kafka-clients-2.0.0.jar:na]
    at org.apache.kafka.clients.NetworkClient.initiateConnect(NetworkClient.java:864) [kafka-clients-2.0.0.jar:na]
    at org.apache.kafka.clients.NetworkClient.ready(NetworkClient.java:265) [kafka-clients-2.0.0.jar:na]
    at org.apache.kafka.clients.producer.internals.Sender.sendProducerData(Sender.java:266) [kafka-clients-2.0.0.jar:na]
    at org.apache.kafka.clients.producer.internals.Sender.run(Sender.java:238) [kafka-clients-2.0.0.jar:na]
    at org.apache.kafka.clients.producer.internals.Sender.run(Sender.java:176) [kafka-clients-2.0.0.jar:na]
    at java.lang.Thread.run(Thread.java:748) [na:1.8.0_144]
Caused by: java.nio.channels.UnresolvedAddressException: null
    at sun.nio.ch.Net.checkAddress(Net.java:101) ~[na:1.8.0_144]
    at sun.nio.ch.SocketChannelImpl.connect(SocketChannelImpl.java:622) ~[na:1.8.0_144]
    at org.apache.kafka.common.network.Selector.doConnect(Selector.java:233) ~[kafka-clients-2.0.0.jar:na]
    ... 7 common frames omitted

问题:如何连接到在Docker中运行的Kafka?我的代码是从主机而不是Docker运行的。

注意:我知道理论上可以玩弄DNS设置和/etc/hosts,但这只是一个解决方法 - 不应该像这样。

也有类似的问题在这里,但它基于ches/kafka镜像。我使用的是基于confluentinc的镜像,这并不相同。


我相信这只在像这样设置网络的docker容器之间起作用。你在这里实际上创建了一个单独的网络(confluent),两个容器(zookeeper和kafka)可以相互通信,但是你不能直接从外部使用localhost访问它。如果你使用/etc/hosts,我认为它可以工作,但我不确定。然而,这不会是一个解决方法,因为容器并没有在localhost上运行,它们在confluent网络上运行。如果你指定IP地址而不是localhost,它是否有效? - Marius Waldal
6个回答

85

简而言之 - 从容器到主机的简单端口转发将无法工作... 不应修改主机文件(例如*NIX系统上的/etc/hosts)以解决Kafka网络问题,因为此解决方案不具备可移植性。

1)您要连接到哪个确切的IP /主机名+端口? 确保该值设置为代理的侦听器(而不是已弃用的advertised.host.name和advertised.port)。

2)确保列在bootstrap.servers中的服务器实际上是可解析的。例如,ping IP /主机名,使用netcat检查端口... 如果您的客户端在容器中,则需要从容器中执行此操作,而不仅仅是从主机中执行。

3)为了验证主机上的端口映射正确,确保docker ps显示kafka容器从0.0.0.0:<host_port> -> <advertised_listener_port>/tcp进行了映射。如果尝试从Docker网络外部运行客户端,则端口必须匹配。


以下答案使用confluentinc的docker镜像来回答问题,而不是wurstmeister/kafka。更具体地说,尽管后者是最受欢迎的Kafka docker镜像之一,但它们并没有得到很好的维护。如果您设置了KAFKA_ADVERTISED_HOST_NAME变量,请将其删除(它是一个已弃用的属性)。
以下部分尝试汇总使用其他镜像所需的所有细节。对于其他常用的Kafka镜像,它们都是在容器中运行的相同的Apache Kafka。你只需要依赖于它如何配置哪些变量使其成为这样。

wurstmeister/kafka

请参阅他们的README部分listener configuration,还可以阅读他们的连接性wiki

bitnami/kafka

如果你需要一个小型的容器,可以试试这个。这个镜像比Confluent的要小得多,并且比wurstmeister更加维护良好。请参考它们的README 获取监听器配置。

debezium/kafka

相关文档在这里提到

注意: advertised host和port设置已经被弃用。Advertised listeners覆盖了双方。和Confluent容器类似,Debezium可以使用以KAFKA_为前缀的broker设置来更新其属性。

其他

  • ubuntu/kafka 要求您通过 Docker 镜像参数添加 --override advertised.listeners=kafka:9092... 我发现这比环境变量不太便携,因此不建议使用。
  • spotify/kafka 已经过时且过时了。
  • fast-data-devlensesio/box 是一个全能的解决方案,包括模式注册表、Kafka Connect 等,但如果您只需要 Kafka,则会很臃肿。此外,在一个容器中运行多个服务是 Docker 的反模式。
  • 您自己的 Dockerfile - 为什么?这些其他选项有什么不完整的地方吗?从拉取请求开始,而不是从头开始。

如需补充阅读、一个完全功能的docker-compose和网络图,请参见this blog by @rmoff

Confluent quickstart (Docker) document 假设所有生产和消费请求都将在 Docker 网络内进行。

你可以通过在其自己的容器中运行 Kafka 客户端代码来解决连接到 kafka:9092 的问题,因为它使用了 Docker 网络桥接,但否则,你需要添加一些更多的环境变量以将容器公开到外部,同时仍然能够在 Docker 网络中运行。

首先添加一个协议映射PLAINTEXT_HOST:PLAINTEXT,将监听协议映射到 Kafka 协议

键:KAFKA_LISTENER_SECURITY_PROTOCOL_MAP
值:PLAINTEXT:PLAINTEXT,PLAINTEXT_HOST:PLAINTEXT

然后在不同的端口上设置两个已公布的侦听器。 (kafka 这里是指 docker 容器名称;它也可能被命名为 broker,所以请仔细检查你的服务 + 主机名)。

键:KAFKA_ADVERTISED_LISTENERS
值:PLAINTEXT://kafka:9092,PLAINTEXT_HOST://localhost:29092

注意这里的协议与上面协议映射设置的左侧值匹配

在运行容器时,为主机端口映射添加-p 29092:29092,并公布PLAINTEXT_HOST监听器。


所以...(使用上述设置

如果仍然有问题,可以将KAFKA_LISTENERS设置为包括<PROTOCOL>://0.0.0.0:<PORT>,其中两个选项与广告设置和Docker转发端口匹配。

在同一台机器上的客户端,不在容器中

广告本地主机名和相关端口将允许您在容器外连接,就像您预期的那样。

换句话说,当运行任何Kafka客户端Docker网络之外(包括您可能已经在本地安装的CLI工具)时,请使用localhost:29092作为引导服务器,localhost:2181作为Zookeeper(需要Docker端口转发)

在另一台机器上的客户端

如果尝试从外部服务器连接,则需要广告主机的外部主机名/ IP(例如192.168.x.y),以及/或代替本地主机名。
仅仅通过端口转发广告本地主机名是不起作用的,因为Kafka协议仍将继续广告您配置的侦听器。

此设置需要 Docker 端口转发路由器端口转发(以及防火墙/安全组更改)如果不在同一本地网络,例如,您的容器正在云中运行,您想从本地机器与其交互。

客户端(或另一个代理)在容器中,位于同一主机上

这是最少出错的配置;您可以直接使用 DNS 服务名称。

在运行应用程序在Docker网络中时,使用kafka:9092(参见上面的广告PLAINTEXT监听器配置)作为引导服务器和zookeeper:2181作为Zookeeper,就像任何其他Docker服务通信一样(不需要任何端口转发)。


如果您使用单独的docker run命令或Compose文件,则需要使用compose networks部分或docker network --create手动定义共享network

查看完整的Confluent stack示例Compose文件更简单的单个broker示例

如果使用多个broker,则它们需要使用唯一的主机名+公告侦听器。 请参考示例

相关问题

如何从Docker(ksqlDB)连接到Kafka主机

附录

对于任何对Kubernetes部署感兴趣的人:


1
但是如果我只想使用9092呢?我的意思是在外部使用9092,而不是29092。 - Maria Pomazkina-Karpikova
@Maria 然后将端口映射PLAINTEXT_HOST://localhost:9092更改为-p 9092:9092。您可能仍需要为Docker网络内的容器设置单独的监听器。 - OneCricketeer
你的意思是在这种情况下我只需要"-p 9092:9092"而不需要"-p 9092:9092"吗? - Maria Pomazkina-Karpikova
@Maria 如果您在主机上有一个客户端,您将需要暴露一些端口。该端口将取决于广告侦听器的设置方式。 - OneCricketeer
哦,我真是太蠢了。我的意思是我不需要精确指定“-p 29092:29092”吗?我只需要“-p 9092:9092”就可以了吗? - Maria Pomazkina-Karpikova
3
@Maria 我想我已经回答了3次了?你可以这样做,但前提是你已经更改为 PLAINTEXT_HOST://localhost:9092,与我的答案相比。 - OneCricketeer

13

当您第一次连接到kafka节点时,它会返回所有kafka节点和连接的url。然后您的应用程序将尝试直接连接到每个kafka。

问题始终是kafka将给您什么样的url?这是为什么有KAFKA_ADVERTISED_LISTENERS的原因,kafka将使用它告诉世界如何访问它。

现在对于您的用例,有多个小事情需要考虑:

假设您设置了plaintext://kafka:9092

  • 如果您在docker compose中有一个使用kafka的应用程序,则可以使用kafka从kafka获取可通过docker网络解析的URL,从而工作正常。
  • 如果您尝试从主系统或另一个不在同一docker网络中的容器连接,则会失败,因为无法解析kafka名称

==> 要解决此问题,您需要拥有特定的DNS服务器(例如服务发现),但是对于小型项目来说这可能很麻烦。或者您可以在每个/etc/hosts中手动将kafka名称设置为容器IP

如果您设置了plaintext://localhost:9092

  • 如果在启动kafka时进行端口映射(-p 9092:9092),则在您的系统上这将正常工作
  • 如果您从容器中的应用程序(相同的docker网络或不同的docker网络)进行测试,则此方法将失败,因为localhost指的是该容器本身而不是kafka

==> 如果您有这个问题,希望在另一个容器中使用kafka客户端,一种解决方法是共享两个容器的网络(相同的IP)

最后的选择:在名称中设置IP:plaintext://x.y.z.a:9092(如文档https://kafka.apache.org/documentation/#brokerconfigs_advertised.listeners中所述,kafka广告url不能是0.0.0.0)

这对每个人来说都可以正常工作...但是,您如何获得x.y.z.a名称?

唯一的方法是在启动容器时硬编码此IP:docker run .... --net confluent --ip 10.x.y.z ...。请注意,您需要将IP适应为confluent子网中的一个有效IP。


谢谢 (+1)。第二个选项对我有用,可能我会坚持使用它。如果没有更好的解决方案出现(正如你已经提到的,我们失去了从容器内建立连接的能力),我稍后会接受这个答案。 - Sasha Shpota

1

在使用zookeeper之前

  1. 运行docker容器:docker container run --name zookeeper -p 2181:2181 zookeeper

在使用kafka之后

  1. 运行docker容器:docker container run --name kafka -p 9092:9092 -e KAFKA_ZOOKEEPER_CONNECT=192.168.8.128:2181 -e KAFKA_ADVERTISED_LISTENERS=PLAINTEXT://你电脑的IP地址但不是localhost!:9092 -e KAFKA_OFFSETS_TOPIC_REPLICATION_FACTOR=1 confluentinc/cp-kafka

kafka消费者和生产者的配置中

@Bean
public ProducerFactory<String, String> producerFactory() {
    Map<String, Object> configProps = new HashMap<>();
    configProps.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, "192.168.8.128:9092");
    configProps.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, StringSerializer.class);
    configProps.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, StringSerializer.class);
    return new DefaultKafkaProducerFactory<>(configProps);
}

@Bean
public ConsumerFactory<String, String> consumerFactory() {
    Map<String, Object> props = new HashMap<>();
    props.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, "192.168.8.128:9092");
    props.put(ConsumerConfig.GROUP_ID_CONFIG, "group_id");
    props.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class);
    props.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class);
    return new DefaultKafkaConsumerFactory<>(props);
}

我按照这些规定运行我的项目。祝你好运,伙计。

ip_address_of_your_computer_but_not_localhost ... Localhost works fine, if you refer my answer... And Compose would be better than docker run - OneCricketeer
1
不应该使用localhost。因为你必须将你的容器视为外部系统。这就是为什么你应该将它指向计算机的IP地址,而不是localhost。 - İbrahim Ersin Yavaş
你可以从容器中进行端口转发,然后在主机上通过 localhost 访问。你尝试过我回答中列出的设置吗?或者阅读了 https://rmoff.net/2018/08/02/kafka-listeners-explained/ 吗? - OneCricketeer

0
最简单的解决方法是通过使用-h选项将自定义主机名添加到您的代理。
docker run -d \
    --net=confluent \
    --name=kafka \
    -h broker-1 \
    -p 9092:9092 \
    -e KAFKA_ZOOKEEPER_CONNECT=zookeeper:2181 \
    -e KAFKA_ADVERTISED_LISTENERS=PLAINTEXT://kafka:9092 \
    -e KAFKA_OFFSETS_TOPIC_REPLICATION_FACTOR=1 \
    confluentinc/cp-kafka:4.1.0

并编辑您的/etc/hosts文件

127.0.0.1   broker-1

并使用:

props.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, "broker-1:9092");

2
不要编辑/etc/hosts文件。使用这段代码仍然会出现“UnknownHost: kafka”的错误...在这里只需使用127.0.0.1:9092,并相应地设置KAFKA_ADVERTISED_LISTENERSPLAINTEXT://localhost:9092即可。 - OneCricketeer

0

这使我能够在我的 M1 Mac 上的 Kafka 应用程序中访问 localhost:9092

Key: KAFKA_ADVERTISED_LISTENERS
Value: PLAINTEXT://kafka:29092,PLAINTEXT_HOST://localhost:9092

加上端口转发:

ports
   - "9092:9092"

最后,针对我的设置,我必须以这种方式设置监听器键

Key: KAFKA_LISTENERS
Value: PLAINTEXT://0.0.0.0:29092,PLAINTEXT_HOST://0.0.0.0:9092

1
这并没有为被接受的答案增加任何内容。"M1 Mac"并不重要,因为所有Docker主机都会表现相同。此外,端口9092是在此处“从docker”使用的... - OneCricketeer

0
      KAFKA_BROKER_ID: 1
      KAFKA_ADVERTISED_HOST_NAME: kafka:9092
      KAFKA_ZOOKEEPER_CONNECT: zookeeper:2181
      KAFKA_LISTENER_SECURITY_PROTOCOL_MAP: PLAINTEXT:PLAINTEXT,PLAINTEXT_HOST:PLAINTEXT
      KAFKA_ADVERTISED_LISTENERS: PLAINTEXT://kafka:29092,PLAINTEXT_HOST://localhost:9092
      KAFKA_INTER_BROKER_LISTENER_NAME: PLAINTEXT
      KAFKA_OFFSETS_TOPIC_REPLICATION_FACTOR: 1

这个配置运行得很好。
确保在 Docker 内部连接 kafka:29092
在容器外部连接 localhost:9092
完整的工作中的 Docker Compose 配置。
    version: "3.3"

    services:
      zookeeper:
        image: confluentinc/cp-zookeeper:6.2.0
        container_name: zookeeper
        networks:
          - broker-kafka
        ports:
          - "2181:2181"
        environment:
          ZOOKEEPER_CLIENT_PORT: 2181
          ZOOKEEPER_TICK_TIME: 2000
          ALLOW_ANONYMOUS_LOGIN: yes
        volumes:
          - ./bitnami/zookeeper:/bitnami/zookeeper
    
      kafka:
        image: confluentinc/cp-kafka:6.2.0
        container_name: kafka
        networks:
          - broker-kafka
        depends_on:
          - zookeeper
        ports:
          - "9092:9092"
        expose:
          - "9092"
        environment:
          KAFKA_BROKER_ID: 1
          KAFKA_ADVERTISED_HOST_NAME: kafka:9092
          KAFKA_ZOOKEEPER_CONNECT: zookeeper:2181
          KAFKA_LISTENER_SECURITY_PROTOCOL_MAP: PLAINTEXT:PLAINTEXT,PLAINTEXT_HOST:PLAINTEXT
          KAFKA_ADVERTISED_LISTENERS: PLAINTEXT://kafka:29092,PLAINTEXT_HOST://localhost:9092
          KAFKA_INTER_BROKER_LISTENER_NAME: PLAINTEXT
          KAFKA_OFFSETS_TOPIC_REPLICATION_FACTOR: 1
          # KAFKA_AUTO_CREATE_TOPICS_ENABLE: "false"
          # KAFKA_GROUP_INITIAL_REBALANCE_DELAY_MS: 0
          # KAFKA_CONFLUENT_LICENSE_TOPIC_REPLICATION_FACTOR: 1
          KAFKA_CONFLUENT_BALANCER_TOPIC_REPLICATION_FACTOR: 1
          # KAFKA_TRANSACTION_STATE_LOG_MIN_ISR: 1
          KAFKA_TRANSACTION_STATE_LOG_REPLICATION_FACTOR: 1
        volumes:
          - ./bitnami/kafka:/bitnami/kafka
    
      kafdrop:
        image: obsidiandynamics/kafdrop
        container_name: kafdrop
        ports:
          - "9000:9000"
        expose:
          - "9000"
        networks:
          - broker-kafka
        environment:
          KAFKA_BROKERCONNECT: "PLAINTEXT://kafka:29092"
          JVM_OPTS: "-Xms16M -Xmx48M -Xss180K -XX:-TieredCompilation -XX:+UseStringDeduplication -noverify"
          SPRING_PROFILES_ACTIVE: "dev"
        depends_on:
          - kafka
          - zookeeper
    
      consumer:
        container_name: consumer
        build:
          context: ./consumer
          dockerfile: Dockerfile
        environment:
          - KAFKA_TOPIC_NAME=app
          - KAFKA_SERVER=kafka
          - KAFKA_PORT=29092
        ports:
          - 8001:8001
        restart: "always"
        depends_on:
          - zookeeper
          - kafka
          - publisher
          - kafdrop
        networks:
          - broker-kafka
    
      publisher:
        container_name: publisher
        build:
          context: ./producer
          dockerfile: Dockerfile
        environment:
          - KAFKA_TOPIC_NAME=app
          - KAFKA_SERVER=kafka
          - KAFKA_PORT=29092
        ports:
          - 8000:8000
        restart: "always"
        depends_on:
          - zookeeper
          - kafka
          - kafdrop
        networks:
          - broker-kafka
        volumes:
          - ./testproducer:/producer
    
    networks:
      broker-kafka:
        driver: bridge

顺便说一下,你的容器数据在这里没有被保留。另外,你有尝试从不同的机器连接测试过吗? - OneCricketeer
@OneCricketeer 我正在尝试在我的Mac上使用上述的docker-compose.yml安装/设置Kafka,但是出现了错误:无法准备上下文:路径"./producer"未找到。我还不得不删除4个空格才能使yml开始工作。 - kamal
@OneCricketeer 创建了一个名为./producer的目录,但现在遇到错误: 解决失败:rpc错误:代码=未知描述=使用前端dockerfile.v0解决失败:无法读取Dockerfile:打开/var/lib/docker/tmp/buildkit-mount238317051/Dockerfile:没有这样的文件或目录 - kamal
@kamal 这不是我的答案。你可以在网上找到很多现有的Kafka compose文件,但正如错误所说,你的compose文件旁边没有生产者文件夹。 - OneCricketeer
@kamal 这不是我的答案。你可以在网上找到很多现有的Kafka compose文件,但正如错误所说,你的compose文件旁边没有producer文件夹。 - undefined
显示剩余2条评论

网页内容由stack overflow 提供, 点击上面的
可以查看英文原文,
原文链接