将Spark DataFrame写入PostgreSQL数据库

9

Spark集群的设置如下:

conf['SparkConfiguration'] = SparkConf() \
.setMaster('yarn-client') \
.setAppName("test") \
.set("spark.executor.memory", "20g") \
.set("spark.driver.maxResultSize", "20g") \
.set("spark.executor.instances", "20")\
.set("spark.executor.cores", "3") \
.set("spark.memory.fraction", "0.2") \
.set("user", "test_user") \
.set("spark.executor.extraClassPath", "/usr/share/java/postgresql-jdbc3.jar")

当我尝试使用以下代码将数据框写入Postgres数据库时:
from pyspark.sql import DataFrameWriter
my_writer = DataFrameWriter(df)

url_connect = "jdbc:postgresql://198.123.43.24:1234"
table = "test_result"
mode = "overwrite"
properties = {"user":"postgres", "password":"password"}

my_writer.jdbc(url_connect, table, mode, properties)

我遇到了以下错误:

Py4JJavaError: An error occurred while calling o1120.jdbc.   
:java.sql.SQLException: No suitable driver
    at java.sql.DriverManager.getDriver(DriverManager.java:278)
at org.apache.spark.sql.execution.datasources.jdbc.JdbcUtils$$anonfun$2.apply(JdbcUtils.scala:50)
at org.apache.spark.sql.execution.datasources.jdbc.JdbcUtils$$anonfun$2.apply(JdbcUtils.scala:50)
at scala.Option.getOrElse(Option.scala:120)
at org.apache.spark.sql.execution.datasources.jdbc.JdbcUtils$.createConnectionFactory(JdbcUtils.scala:49)
at org.apache.spark.sql.DataFrameWriter.jdbc(DataFrameWriter.scala:278)
at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
at sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:57)
at sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
at java.lang.reflect.Method.invoke(Method.java:606)
at py4j.reflection.MethodInvoker.invoke(MethodInvoker.java:231)
at py4j.reflection.ReflectionEngine.invoke(ReflectionEngine.java:381)
at py4j.Gateway.invoke(Gateway.java:259)
at py4j.commands.AbstractCommand.invokeMethod(AbstractCommand.java:133)
at py4j.commands.CallCommand.execute(CallCommand.java:79)
at py4j.GatewayConnection.run(GatewayConnection.java:209)
at java.lang.Thread.run(Thread.java:745)

有人能提供一些关于这个问题的建议吗? 谢谢!

3个回答

11

尝试使用write.jdbc,并分别传递在write.jdbc()之外创建的参数。 还要检查Postgres可用于写入的端口,我的Postgres 9.6为5432,Postgres 8.4为5433。

mode = "overwrite"
url = "jdbc:postgresql://198.123.43.24:5432/kockpit"
properties = {"user": "postgres","password": "password","driver": "org.postgresql.Driver"}
data.write.jdbc(url=url, table="test_result", mode=mode, properties=properties)

3

您是否已经下载了PostgreSQL JDBC驱动程序?请在此处下载:https://jdbc.postgresql.org/download.html

对于使用pyspark shell的情况,您需要使用SPARK_CLASSPATH环境变量:

$ export SPARK_CLASSPATH=/path/to/downloaded/jar
$ pyspark

使用spark-submit提交脚本时,请使用--driver-class-path标志:

$ spark-submit --driver-class-path /path/to/downloaded/jar script.py

2
也许你可以尝试明确传递JDBC驱动程序类(注意,您可能需要将驱动程序jar放入所有spark节点的类路径中):
df.write.option('driver', 'org.postgresql.Driver').jdbc(url_connect, table, mode, properties)

感谢您的回复。它给出了以下错误信息:TypeError: 'DataFrameWriter'对象不可调用。 - Yiliang
1
@Yiliang,抱歉,在pyspark中“write”不是一个函数,你应该使用“df.write”而不是“df.write()”。这是我的错误。 - Daniel de Paula
谢谢Daniel。现在我遇到了java.lang.NullPointerException这个错误,你有什么想法吗? - Yiliang

网页内容由stack overflow 提供, 点击上面的
可以查看英文原文,
原文链接