Spark DataFrame插入到JDBC中 - TableAlreadyExists异常

12

使用Spark 1.4.0,我正在尝试使用insertIntoJdbc()将Spark DataFrame中的数据插入到MemSQL数据库(应该与与MySQL数据库交互完全相同)。 但是,我不断收到Runtime TableAlreadyExists异常。

首先,我用以下方式创建MemSQL表:

CREATE TABLE IF NOT EXISTS table1 (id INT AUTO_INCREMENT PRIMARY KEY, val INT);

然后我在Spark中创建了一个简单的数据框,并尝试像这样插入到MemSQL中:

val df = sc.parallelize(Array(123,234)).toDF.toDF("val")
//df: org.apache.spark.sql.DataFrame = [val: int]

df.insertIntoJDBC("jdbc:mysql://172.17.01:3306/test?user=root", "table1", false)

java.lang.RuntimeException: Table table1 already exists.
3个回答

8
此解决方案适用于一般的JDBC连接,但@wayne的回答可能是memSQL特定情况下更好的解决方案。
从1.4.0版本开始,insertIntoJdbc貌似已被弃用,实际上使用它将调用write.jdbc()方法。
write()返回一个DataFrameWriter对象。如果您想将数据追加到表中,则必须将对象的保存模式更改为"append"。
上述问题中另一个问题是DataFrame架构与目标表的架构不匹配。
以下代码提供了来自Spark shell的工作示例。我使用`spark-shell --driver-class-path mysql-connector-java-5.1.36-bin.jar`来启动我的spark-shell会话。
import java.util.Properties

val prop = new Properties() 
prop.put("user", "root")
prop.put("password", "")  

val df = sc.parallelize(Array((1,234), (2,1233))).toDF.toDF("id", "val")   
val dfWriter = df.write.mode("append") 

dfWriter.jdbc("jdbc:mysql://172.17.01:3306/test", "table1", prop) 

2
嗨,Elbow,我正在使用Spark 1.5,即使我说了write.mode(“append”),仍然会出现表已存在的异常,你想对此发表评论吗?数据库中已经有一个名为“customer_spark”的对象。 - sri hari kali charan Tummala
1
嘿@DJElbow,我也是一样,当使用write.mode(SaveMode.Append)时,仍然会出现“表'table1'已经存在”的异常。我检查了一下,当使用'root'用户时,它运行得很好,但是当使用具有CREATE/INSERT/UPDATE权限的用户时,我就会遇到这个错误。 - marnun

3

insertIntoJDBC文档实际上是错误的;它们说表必须已经存在,但实际上如果存在,它会抛出一个错误,正如您在上面看到的:

https://github.com/apache/spark/blob/03cca5dce2cd7618b5c0e33163efb8502415b06e/sql/core/src/main/scala/org/apache/spark/sql/DataFrameWriter.scala#L264

我们建议使用我们的MemSQL Spark连接器,您可以在此处找到:

https://github.com/memsql/memsql-spark-connector

如果您在代码中包含该库并导入com.memsql.spark.connector._,则可以使用df.saveToMemSQL(...)将DataFrame保存到MemSQL。 您可以在此处找到我们连接器的文档:

http://memsql.github.io/memsql-spark-connector/latest/api/#com.memsql.spark.connector.DataFrameFunctions


非常好。这简化了事情。有一个编译好的JAR包可以在哪里下载吗?我找不到一个。 - DJElbow
1
如果您将 maven.memsql.com 添加为解析器,就可以在项目中将其包含为依赖项:https://github.com/memsql/memsql-spark-connector#using - Wayne Song

1

我遇到了同样的问题。将Spark版本更新至1.6.2后问题得到解决。


网页内容由stack overflow 提供, 点击上面的
可以查看英文原文,
原文链接