在WSL2中通过IDE连接到Kafka服务器时出现错误。

4
我无法通过Intellij或者VSCode连接运行在Ubuntu上的kafka服务器(我在WSL2上尝试的第一个服务器)。我甚至尝试使用虚拟机的IP,但是没有成功。根据这篇文档https://learn.microsoft.com/en-us/windows/wsl/compare-versions,我理解我们应该能够使用'localhost'进行连接。我有什么遗漏吗?
以下是我的代码:
    Properties producerProperties = new Properties();
    producerProperties.setProperty(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, "localhost:9092");
    producerProperties.setProperty(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, StringSerializer.class.getName());
    producerProperties.setProperty(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, StringSerializer.class.getName());

    KafkaProducer<String, String> producer = new KafkaProducer<String, String>(producerProperties);

    ProducerRecord<String, String> record = new ProducerRecord<>("topic_1", "hello from java");
    producer.send(record);

    producer.flush();
    producer.close();

这里是错误信息:

错误日志


请参阅https://dev59.com/bL3pa4cB1Zd3GeqPgIlp#65553634。 - undefined
5个回答

4

暂时禁用IPV6对我有用。我按照Dhruv Roy Talukdar的答案中的链接到这个文章

总之,创建一个shell脚本:

$ cat disable_ip6.sh
sudo sysctl -w net.ipv6.conf.all.disable_ipv6=1
sudo sysctl -w net.ipv6.conf.default.disable_ipv6=1
sudo sysctl -w net.ipv6.conf.lo.disable_ipv6=1enter code here

在启动zookeeper和kafka服务器之前运行它。


在Akka HTTP的背景下,简单而有效。 - undefined

2

WSL2目前存在一个网络问题,阻止外部程序连接运行在WSL2上的Kafka(例如Java程序、Conduktor等);

对我有用的方法是在WSL2上禁用ipv6,这样就可以工作了。 您可以按照此文章中的说明禁用它。有两种方法:

  • 临时禁用服务:每次重启后都需要这样做。我是按照上述文章操作的。
  • 永久禁用服务:我按照这个stackoverflow的答案进行操作的。这种方法对我来说更容易一些。

1

以下是对我有用的方法。请注意,我还必须在我的电脑上启用企业 VPN 才能连接到公司网络。

基本上,如果我们使用 WSL 机器的 IP 地址,事情就会起作用。

  1. 找到 WSL IP 地址。
    启动 WSL 并执行 hostname -I
  2. 将第 1 步中的 IP 地址添加到 <path/to>/kafka/config/server.properties advertised.listeners=PLAINTEXT://x.x.x.x:9092
    重启 Kafka。
  3. 运行在 Windows 上的客户端应用程序中,使用与第 1 步相同的 IP 地址作为引导服务器。

0

您需要将代码中的 localhost 替换为 0.0.0.0(所有 IP)

像这样:

producerProperties.setProperty(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, "0.0.0.0:9092");

在您提供的链接中的 "additional-networking-considerations" 部分有详细说明。


尝试过了,没有起作用。我猜想这可能与防火墙有关。 - undefined
好的,我想我误解了你想要做什么。暂时在你的代码中保留localhost:9092。问题是你如何在WSL2中运行kafkaserver。服务器需要绑定到0.0.0.0。 - undefined

0

我遇到了同样的问题,解决起来相当困难。突破口是当我发现了WSL2中这个已关闭但显然仍有问题的问题。基本上,问题出在我无法从Windows 10的IntelliJ访问Ubuntu/WSL2的本地主机。所以,当我在IntelliJ中编译和运行程序时,就会出现你发布的错误。

我的设置的一些细节:

操作系统:Windows 10,版本2004(OS Build 19041.630)

我的build.sbt文件:

scalaVersion := "2.12.10"

libraryDependencies += "org.apache.spark" %% "spark-sql" % "3.0.1"
libraryDependencies += "org.apache.spark" %% "spark-streaming" % "3.0.1"
libraryDependencies += "org.apache.bahir" %% "spark-streaming-twitter" % "2.4.0"
libraryDependencies += "org.apache.httpcomponents" % "httpclient" % "4.5.12"
libraryDependencies += "commons-io" % "commons-io" % "2.8.0"
libraryDependencies += "org.apache.spark" % "spark-sql-kafka-0-10_2.12" % "3.0.1"

这是我尝试运行的Scala代码,它读取一个主题(quickstart-events)并将其发表到另一个主题(aux-output)。
package kafka

import org.apache.spark.sql.SparkSession

object kafkaRunner {
   def main(args: Array[String]): Unit = {
     val spark = SparkSession.builder()
       .appName("Kafka First App")
       .master("local[*]")
       .getOrCreate()

     import spark.implicits._

     val df = spark
       .readStream
       .format("kafka")
       .option("kafka.bootstrap.servers", "localhost:9092")
       .option("subscribe", "quickstart-events")
       .load()

     df
       .writeStream
       .format("kafka")
       .option("kafka.bootstrap.servers", "localhost:9092")
       .option("topic", "aux-output")
       .option("checkpointLocation", "/tmp/kafka-checkpoint")
       .start()
       .awaitTermination()
  }
}

我已经运行了这个程序很多次,为了重新开始,我删除了Zookeeper和Kafka的/tmp文件。我不知道这些文件有多重要,所以请谨慎操作。我删除了以下三个目录:

  • /tmp/kafka-logs
  • /tmp/zookeeper
  • /tmp/kafka-checkpoint(这是我在程序中设置的一个目录,你的可能不同,但是当我没有设置时,Spark会报错)。

接下来,我从我的Ubuntu Kafka目录中运行了这些命令。每个命令在一个单独的终端窗口中运行。

[terminal window 1 - zookeeper server]
bin/zookeeper-server-start.sh config/zookeeper.properties

[terminal window 2 - kafka server, **wait until zookeeper finishes loading before running**]
bin/kafka-server-start.sh config/server.properties

[terminal window 3 - create our 2 topics, then run the producer for **quickstart**]
(the following three commands were run in the same window)

bin/kaftopics.sh --create.sh --topic quickstart-events --bootstrap-server localhost:9092

bin/kaftopics.sh --create.sh --topic aux-output --bootstrap-server localhost:9092

bin/kafka-console-producer.sh --topic quickstart-events --bootstrap-server localhost:9092

[terminal window 4 - create a consumer for **quickstart** channel]
bin/kafka-console-consumer.sh --topic quickstart-events --from-beginning --bootstrap-server localhost:9092

[terminal window 5 - create a consumer for **aux-out** channel]
bin/kafka-console-consumer.sh --topic aux-out --from-beginning --bootstrap-server localhost:9092

[terminal window 6 - use to run sbt]

我花了一些时间在生产者(窗口3)中输入了几行代码,并在快速启动消费者(窗口4)中寻找输出。辅助输出(窗口5)中不应该显示任何内容,这将在程序在sbt中运行时生成。

然后我运行了我的程序。我没有将项目从Windows移动,而是在Ubuntu中导航到Windows目录(/mnt/c/User/me/lots/of/dir/kafkaProject)。我在带有build.sbt的目录中启动了sbt。一旦sbt加载完成,我就执行'compile'和'run'

它开始像一个Spark作业一样进行处理,但是接着文本开始飞过。这时候你应该在aux-out中看到来自quickstart主题的输入输出。

在程序运行时,窗口3的生产者中输入的文本应该会显示在4和5中。

有一件事我没有提到,那就是在尝试运行程序几次后失败了,我执行了"wsl.exe --shutdown"命令,并重新启动了所有的Windows系统,以便重新开始。如果你遇到说主题丢失的错误,请尝试更改主题名称并重新开始。我发现有时之前使用过但无法正常工作的主题会损坏。我相信还有其他一些我尚未发现的临时文件在缓存这些主题,但一旦我解决了问题,我就继续进行了。

祝你好运!


网页内容由stack overflow 提供, 点击上面的
可以查看英文原文,
原文链接