我遇到了同样的问题,解决起来相当困难。突破口是当我发现了WSL2中这个已关闭但显然仍有问题的问题。基本上,问题出在我无法从Windows 10的IntelliJ访问Ubuntu/WSL2的本地主机。所以,当我在IntelliJ中编译和运行程序时,就会出现你发布的错误。
我的设置的一些细节:
操作系统:Windows 10,版本2004(OS Build 19041.630)
我的build.sbt文件:
scalaVersion := "2.12.10"
libraryDependencies += "org.apache.spark" %% "spark-sql" % "3.0.1"
libraryDependencies += "org.apache.spark" %% "spark-streaming" % "3.0.1"
libraryDependencies += "org.apache.bahir" %% "spark-streaming-twitter" % "2.4.0"
libraryDependencies += "org.apache.httpcomponents" % "httpclient" % "4.5.12"
libraryDependencies += "commons-io" % "commons-io" % "2.8.0"
libraryDependencies += "org.apache.spark" % "spark-sql-kafka-0-10_2.12" % "3.0.1"
这是我尝试运行的Scala代码,它读取一个主题(quickstart-events)并将其发表到另一个主题(aux-output)。
package kafka
import org.apache.spark.sql.SparkSession
object kafkaRunner {
def main(args: Array[String]): Unit = {
val spark = SparkSession.builder()
.appName("Kafka First App")
.master("local[*]")
.getOrCreate()
import spark.implicits._
val df = spark
.readStream
.format("kafka")
.option("kafka.bootstrap.servers", "localhost:9092")
.option("subscribe", "quickstart-events")
.load()
df
.writeStream
.format("kafka")
.option("kafka.bootstrap.servers", "localhost:9092")
.option("topic", "aux-output")
.option("checkpointLocation", "/tmp/kafka-checkpoint")
.start()
.awaitTermination()
}
}
我已经运行了这个程序很多次,为了重新开始,我删除了Zookeeper和Kafka的/tmp文件。我不知道这些文件有多重要,所以请谨慎操作。我删除了以下三个目录:
- /tmp/kafka-logs
- /tmp/zookeeper
- /tmp/kafka-checkpoint(这是我在程序中设置的一个目录,你的可能不同,但是当我没有设置时,Spark会报错)。
接下来,我从我的Ubuntu Kafka目录中运行了这些命令。每个命令在一个单独的终端窗口中运行。
[terminal window 1 - zookeeper server]
bin/zookeeper-server-start.sh config/zookeeper.properties
[terminal window 2 - kafka server, **wait until zookeeper finishes loading before running**]
bin/kafka-server-start.sh config/server.properties
[terminal window 3 - create our 2 topics, then run the producer for **quickstart**]
(the following three commands were run in the same window)
bin/kaftopics.sh --create.sh --topic quickstart-events --bootstrap-server localhost:9092
bin/kaftopics.sh --create.sh --topic aux-output --bootstrap-server localhost:9092
bin/kafka-console-producer.sh --topic quickstart-events --bootstrap-server localhost:9092
[terminal window 4 - create a consumer for **quickstart** channel]
bin/kafka-console-consumer.sh --topic quickstart-events --from-beginning --bootstrap-server localhost:9092
[terminal window 5 - create a consumer for **aux-out** channel]
bin/kafka-console-consumer.sh --topic aux-out --from-beginning --bootstrap-server localhost:9092
[terminal window 6 - use to run sbt]
我花了一些时间在生产者(窗口3)中输入了几行代码,并在快速启动消费者(窗口4)中寻找输出。辅助输出(窗口5)中不应该显示任何内容,这将在程序在sbt中运行时生成。
然后我运行了我的程序。我没有将项目从Windows移动,而是在Ubuntu中导航到Windows目录(/mnt/c/User/me/lots/of/dir/kafkaProject)。我在带有build.sbt的目录中启动了sbt
。一旦sbt
加载完成,我就执行'compile'和'run'
它开始像一个Spark作业一样进行处理,但是接着文本开始飞过。这时候你应该在aux-out中看到来自quickstart主题的输入输出。
在程序运行时,窗口3的生产者中输入的文本应该会显示在4和5中。
有一件事我没有提到,那就是在尝试运行程序几次后失败了,我执行了"wsl.exe --shutdown"命令,并重新启动了所有的Windows系统,以便重新开始。如果你遇到说主题丢失的错误,请尝试更改主题名称并重新开始。我发现有时之前使用过但无法正常工作的主题会损坏。我相信还有其他一些我尚未发现的临时文件在缓存这些主题,但一旦我解决了问题,我就继续进行了。
祝你好运!