在独立的Pyspark中添加Jar文件

32

我正在启动一个pyspark程序:

$ export SPARK_HOME=
$ export PYTHONPATH=$SPARK_HOME/python:$SPARK_HOME/python/lib/py4j-0.9-src.zip
$ python

而且Python代码:

from pyspark import SparkContext, SparkConf

SparkConf().setAppName("Example").setMaster("local[2]")
sc = SparkContext(conf=conf)

如何添加jar依赖项,例如Databricks csv jar?使用命令行,我可以像这样添加包:

$ pyspark/spark-submit --packages com.databricks:spark-csv_2.10:1.3.0 

但是我没有使用这些。该程序是更大工作流的一部分,不使用spark-submit 我应该能够运行./foo.py程序并且它应该能够正常工作。

  • 我知道你可以为extraClassPath设置spark属性,但你必须将JAR文件复制到每个节点吗?
  • 尝试过conf.set("spark.jars", "jar1,jar2") ,但会有一个py4j CNF异常。
7个回答

52

2021-01-19 更新

有很多方法可以实现此目的(设置环境变量,添加到 $SPARK_HOME/conf/spark-defaults.conf 等等...)其他回答已经涵盖了这些。我想为那些特别想从Python脚本Jupyter笔记本中执行此操作的人添加一个回答。

在创建Spark会话时,您可以添加一个 .config(),以引入特定的Jar文件(在我的情况下,我想要加载 Kafka 包):

spark = SparkSession.builder.appName('my_awesome')\
    .config('spark.jars.packages', 'org.apache.spark:spark-sql-kafka-0-10_2.12:3.0.1')\
    .getOrCreate()

使用这行代码,我不需要做任何其他事情(不需要更改环境变量或配置文件)。

  • 注意1:JAR文件将会动态地下载,您不需要手动下载。
  • 注意2:确保版本与您想要的匹配,所以在上面的例子中,我的Spark版本是3.0.1,所以我在结尾处加上了:3.0.1

5
这个选项在其他地方似乎经常被忽略或未被记录…正如所述,对于Jupyter用户来说,这是一个很好的解决方案。 - Luke W
4
使用"spark.jars"参数来指定Jars文件。 - Saksham
2
这个答案非常适合那些需要在代码中启动Spark环境并需要在运行时拉取jar包的人。我成功地使用它将GraphFrames jar包加载到一些非常受限制的系统上,这些系统没有提供构建自定义SparkConf文件的方法。感谢您清晰的示例! - bsplosion
2
@briford-wylie 但是你是否需要下载并将jar文件放在某个地方?我在Spark的“.../jars/”目录中对每个jar执行了jar -tvf fileName.jar | grep -i kafka,但没有找到kafka相关的内容。你的在哪里?我不一定对kafka本身感兴趣;我只是在跟随你的示例尝试将其推广。 - NYCeyes
将graphframes的jar包添加到项目中时出现问题,因为它们已从Maven Central移动到Bintree。是否可以添加自定义仓库以进行查找? - user1264641
显示剩余3条评论

14

任何依赖项都可以使用 spark.jars.packages(设置spark.jars也应该可以)属性在$SPARK_HOME/conf/spark-defaults.conf文件中传递。它应该是一个逗号分隔的坐标列表。

并且包或类路径属性必须在JVM启动之前设置,并且这发生在SparkConf初始化期间。 这意味着SparkConf.set方法不能在此处使用。

另一种方法是在初始化SparkConf对象之前设置PYSPARK_SUBMIT_ARGS环境变量:

import os
from pyspark import SparkConf

SUBMIT_ARGS = "--packages com.databricks:spark-csv_2.11:1.2.0 pyspark-shell"
os.environ["PYSPARK_SUBMIT_ARGS"] = SUBMIT_ARGS

conf = SparkConf()
sc = SparkContext(conf=conf)

2
这个解决方案似乎对我不起作用,至少在笔记本中;我仍然会收到类未找到错误。实际上,我设置的任何环境变量似乎都没有被Spark获取。看起来os.environ只为Python内核所运行的进程设置环境,但任何子进程都不会获取这些环境变量。换句话说,它没有执行相当于export…的操作。有什么想法吗? - santon
subprocess.Popen 接受 env 参数,您可以通过它传递当前环境的副本。 - zero323
这确实有效,但我不知道它在幕后是如何工作的。将PYSPARK_SUBMIT_ARGS设置为不完整的SUBMIT_ARGS,而没有spark-submit,在运行Spark作业时会自动导入包。是否有任何文档说明它是如何工作的? - JohnWick

6
我遇到了一个类似的问题,与另一个“MongoDB Connector for Spark”(mongo-spark-connector)的jar有关。但是一个大警告是,我是通过在conda中使用pyspark安装Spark的(conda install pyspark)。因此,所有针对Spark特定答案的帮助并不完全有用。对于那些使用conda安装的人,这是我凑合出来的过程:

1)找到您的pyspark/jars的位置。我的路径是: ~/anaconda2/pkgs/pyspark-2.3.0-py27_0/lib/python2.7/site-packages/pyspark/jars

2)下载jar文件到步骤1中找到的路径中,从此位置

3) 现在您应该能够运行类似于这样的代码(代码摘自MongoDB官方教程,使用Briford Wylie上面的答案):

from pyspark.sql import SparkSession

my_spark = SparkSession \
    .builder \
    .appName("myApp") \
    .config("spark.mongodb.input.uri", "mongodb://127.0.0.1:27017/spark.test_pyspark_mbd_conn") \
    .config("spark.mongodb.output.uri", "mongodb://127.0.0.1:27017/spark.test_pyspark_mbd_conn") \
    .config('spark.jars.packages', 'org.mongodb.spark:mongo-spark-connector_2.11:2.2.2') \
    .getOrCreate()

声明:

1)我不确定这个答案是否适合在此处或者SO问题中发布,请建议更好的地方,我会将其转移。

2)如果您认为我有错误或对上述过程有改进意见,请发表评论,我会进行修订。


如果不使用Maven,您将如何处理“spark.jars.packages”? - Brian
你是什么意思?如果这是一个错误,你想要发布一个堆栈跟踪吗? - ximiki

3
经过多次尝试,我终于找到了答案。这个答案是关于使用spark-csv jar的。在你的硬盘上创建一个文件夹,比如D:\Spark\spark_jars。将以下jar包放入其中:
  1. spark-csv_2.10-1.4.0.jar(这是我使用的版本)
  2. commons-csv-1.1.jar
  3. univocity-parsers-1.5.1.jar
第二和第三个jar包是spark-csv需要的依赖项,因此这两个文件也需要下载。进入你已经下载Spark的conf目录,在spark-defaults.conf文件中添加以下行:

spark.driver.extraClassPath D:/Spark/spark_jars/*

星号应该包含所有的jar包。现在运行Python,创建SparkContext、SQLContext,然后像往常一样使用spark-csv。
sqlContext.read.format('com.databricks.spark.csv').\
options(header='true', inferschema='true').\
load('foobar.csv')

3

通过使用 PySpark、PostgreSQL 和 Apache Sedona,我学会了用两种方法来解决这个问题。


方法1:下载JAR文件并添加到spark.jars

为了在Spark上使用PostgreSQL,我需要将JDBC驱动程序(JAR文件)添加到PySpark中。

首先,在与我的程序同级别创建一个jars目录,并将postgresql-42.5.0.jar文件存储在其中。

然后,我只需使用以下配置将其添加到SparkSession中:SparkSession.builder.config("spark.jars", "{JAR_FILE_PATH}")

spark = (
    SparkSession.builder
    .config("spark.jars", "jars/postgresql-42.5.0.jar")
    .master("local[*]")
    .appName("Example - Add a JAR file")
    .getOrCreate()
)

方法二:使用Maven Central坐标和spark.jars.packages

如果您的依赖JAR文件可在Maven上获得,您可以使用此方法,无需维护任何JAR文件

步骤

  1. Maven Central Repository Search上找到你的软件包Example - postgresql

  2. 选择正确的软件包构件并复制Maven Central coordinateExample - coordinate

  3. 在Python中,调用SparkSession.builder.config("spark.jars.packages", "{MAVEN_CENTRAL_COORDINATE}")

    spark = (
        SparkSession.builder
        .appName('Example - adding many Maven packages')
    
        .config("spark.serializer", KryoSerializer.getName)
        .config("spark.kryo.registrator", SedonaKryoRegistrator.getName)
        .config("spark.jars.packages",
                "org.postgresql:postgresql:42.5.0,"
                + "org.apache.sedona:sedona-python-adapter-3.0_2.12:1.2.1-incubating,"
                + "org.datasyslab:geotools-wrapper:1.1.0-25.2")
    
        .getOrCreate()
     )
    

使用 sparks.jars.packages 的优点

  • 您可以添加多个软件包
  • 您无需管理大型 JAR 文件

使用 sparks.jars.packages 的缺点

.config("sparks.jars.packages", ...) 只接受一个参数,因此为了添加多个软件包,您需要使用,作为分隔符连接软件包坐标

"org.postgresql:postgresql:42.5.0,"
+ "org.apache.sedona:sedona-python-adapter-3.0_2.12:1.2.1-incubating,"
+ "org.datasyslab:geotools-wrapper:1.1.0-25.2"

*** 字符串不容忍您的代码中出现换行空格制表符,这将导致严重的错误日志并产生无关紧要的错误。


1
import os
import sys
spark_home = os.environ.get('SPARK_HOME', None)
sys.path.insert(0, spark_home + "/python")
sys.path.insert(0, os.path.join(spark_home, 'python/lib/py4j-0.10.4-src.zip'))

它来了...

sys.path.insert(0, <PATH TO YOUR JAR>)

然后...
import pyspark
import numpy as np

from pyspark import SparkContext

sc = SparkContext("local[1]")
.
.
.

1
sys.path 用于 Python 包,而不是 JAR 包。 - iggy

0
在yml清单中的sparkoperator中,您可以使用sparkConf中的"spark.jars.packages"来安装多个包。
apiVersion: "sparkoperator.k8s.io/v1beta2"
kind: SparkApplication
metadata:
  name: test
  namespace: default
spec:
  type: Python
  pythonVersion: "3"
  mode: cluster
  sparkVersion: "3.3.2"
  sparkConf:
    "spark.jars.packages": "org.apache.hadoop:hadoop-aws:3.3.2,com.amazonaws:aws-java-sdk-bundle:1.12.99"

网页内容由stack overflow 提供, 点击上面的
可以查看英文原文,
原文链接