Pyspark无法找到数据源:kafka。

8
我正在处理Kafka流数据,并尝试将其与Apache Spark集成。但是,在运行时我遇到了问题。我收到以下错误。
这是我正在使用的命令。
df_TR = Spark.readStream.format("kafka").option("kafka.bootstrap.servers", "localhost:9092").option("subscribe", "taxirides").load()
错误:
Py4JJavaError: 调用o77.load()时发生错误:java.lang.ClassNotFoundException: 找不到数据源:kafka。请在http://spark.apache.org/third-party-projects.html中查找软件包。
我该如何解决这个问题?
注意:我在Jupyter Notebook中运行此操作。
findspark.init('/home/karan/spark-2.1.0-bin-hadoop2.7')
import pyspark
from pyspark.sql import SparkSession
Spark = SparkSession.builder.appName('KafkaStreaming').getOrCreate()
from pyspark.sql.types import *
from pyspark.streaming import StreamingContext
from pyspark.streaming.kafka import KafkaUtils

到这里为止一切正常(上面的代码)。

df_TR = Spark.readStream.format("kafka").option("kafka.bootstrap.servers", "localhost:9092").option("subscribe", "taxirides").load()

这是出问题的地方(上面的代码)。

我正在遵循的博客链接:https://www.adaltas.com/en/2019/04/18/spark-streaming-data-pipelines-with-structured-streaming/

1个回答

10

编辑

使用spark.jars.packagesPYSPARK_SUBMIT_ARGS更有效。

参考 - PySpark - NoClassDefFoundError: kafka/common/TopicAndPartition


不清楚您是如何运行代码的。继续阅读博客,您会发现:

spark-submit \
  ...
  --packages org.apache.spark:spark-sql-kafka-0-10_2.11:2.4.0 \
  sstreaming-spark-out.py

看起来您忘记添加--packages标志了。

在Jupyter中,您可以添加这个标志。

import os

# setup arguments
os.environ['PYSPARK_SUBMIT_ARGS'] = '--packages org.apache.spark:spark-sql-kafka-0-10_2.11:2.4.0'

# initialize spark
import pyspark, findspark
findspark.init()
注意:_2.11:2.4.0 需要与您的 Scala 和 Spark 版本对齐...根据问题,您的应该是 Spark 2.1.0。

添加import OS后,我现在又遇到了另一个错误。 Py4JJavaError:调用o27.load时发生错误。 :java.lang.ClassNotFoundException:无法找到数据源:kafka。 - P Kernel
1
@PKernel 这是因为 spark-sql-kafka 的版本与您当前运行的 Spark 版本不匹配。 - Krishna Kumar Singh

网页内容由stack overflow 提供, 点击上面的
可以查看英文原文,
原文链接