如何使用Spark Dataset写入PostgreSQL hstore

7
我正在尝试将Spark数据集写入现有的postgresql表格(不能更改表格元数据,如列类型)。该表格的其中一列是HStore类型,这会导致问题。
当我启动写入操作时,会出现以下异常(此处原始映射为空,转义后变为空字符串):
Caused by: java.sql.BatchUpdateException: Batch entry 0 INSERT INTO part_d3da09549b713bbdcd95eb6095f929c8 (.., "my_hstore_column", ..) VALUES (..,'',..) was aborted.  Call getNextException to see the cause.
    at org.postgresql.jdbc.BatchResultHandler.handleError(BatchResultHandler.java:136)
    at org.postgresql.core.v3.QueryExecutorImpl$1.handleError(QueryExecutorImpl.java:419)
    at org.postgresql.core.v3.QueryExecutorImpl$ErrorTrackingResultHandler.handleError(QueryExecutorImpl.java:308)
    at org.postgresql.core.v3.QueryExecutorImpl.processResults(QueryExecutorImpl.java:2004)
    at org.postgresql.core.v3.QueryExecutorImpl.flushIfDeadlockRisk(QueryExecutorImpl.java:1187)
    at org.postgresql.core.v3.QueryExecutorImpl.sendQuery(QueryExecutorImpl.java:1212)
    at org.postgresql.core.v3.QueryExecutorImpl.execute(QueryExecutorImpl.java:351)
    at org.postgresql.jdbc.PgStatement.executeBatch(PgStatement.java:1019)
    at org.apache.spark.sql.execution.datasources.jdbc.JdbcUtils$.savePartition(JdbcUtils.scala:222)
    at org.apache.spark.sql.execution.datasources.jdbc.JdbcUtils$$anonfun$saveTable$1.apply(JdbcUtils.scala:300)
    at org.apache.spark.sql.execution.datasources.jdbc.JdbcUtils$$anonfun$saveTable$1.apply(JdbcUtils.scala:299)
    at org.apache.spark.rdd.RDD$$anonfun$foreachPartition$1$$anonfun$apply$28.apply(RDD.scala:902)
    at org.apache.spark.rdd.RDD$$anonfun$foreachPartition$1$$anonfun$apply$28.apply(RDD.scala:902)
    at org.apache.spark.SparkContext$$anonfun$runJob$5.apply(SparkContext.scala:1899)
    at org.apache.spark.SparkContext$$anonfun$runJob$5.apply(SparkContext.scala:1899)
    at org.apache.spark.scheduler.ResultTask.runTask(ResultTask.scala:70)
    at org.apache.spark.scheduler.Task.run(Task.scala:86)
    at org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:274)
    at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1142)
    at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:617)
    at java.lang.Thread.run(Thread.java:745)
Caused by: org.postgresql.util.PSQLException: ERROR: column "my_hstore_column" is of type hstore but expression is of type character varying

这是我的做法:

def escapePgHstore[A, B](hmap: Map[A, B]) = {
  hmap.map{case(key, value) => s""" "${key}"=>${value} """}.mkString(",")
}
...
val props = new Properties()
props.put("user", "xxxxxxx")
props.put("password", "xxxxxxx")

ds.withColumn("my_hstore_column", escape_pg_hstore_udf($"original_column"))
  .drop("original_column")
  .coalesce(1).write
  .mode(org.apache.spark.sql.SaveMode.Append)
  .option("driver", "org.postgresql.Driver")
  .jdbc(jdbcUrl, hashedTablePartName, props)

如果我不使用escapePgHstore将Map [String,Long]中的original_column转义为字符串,我就会看到以下错误:
java.lang.IllegalArgumentException: Can't get JDBC type for map<string,bigint>
    at org.apache.spark.sql.execution.datasources.jdbc.JdbcUtils$$anonfun$org$apache$spark$sql$execution$datasources$jdbc$JdbcUtils$$getJdbcType$2.apply(JdbcUtils.scala:137)
    at org.apache.spark.sql.execution.datasources.jdbc.JdbcUtils$$anonfun$org$apache$spark$sql$execution$datasources$jdbc$JdbcUtils$$getJdbcType$2.apply(JdbcUtils.scala:137)
    at scala.Option.getOrElse(Option.scala:121)
    at org.apache.spark.sql.execution.datasources.jdbc.JdbcUtils$.org$apache$spark$sql$execution$datasources$jdbc$JdbcUtils$$getJdbcType(JdbcUtils.scala:136)
    at org.apache.spark.sql.execution.datasources.jdbc.JdbcUtils$$anonfun$7.apply(JdbcUtils.scala:293)
    at org.apache.spark.sql.execution.datasources.jdbc.JdbcUtils$$anonfun$7.apply(JdbcUtils.scala:292)
    at scala.collection.TraversableLike$$anonfun$map$1.apply(TraversableLike.scala:234)
    at scala.collection.TraversableLike$$anonfun$map$1.apply(TraversableLike.scala:234)
    at scala.collection.IndexedSeqOptimized$class.foreach(IndexedSeqOptimized.scala:33)
    at scala.collection.mutable.ArrayOps$ofRef.foreach(ArrayOps.scala:186)
    at scala.collection.TraversableLike$class.map(TraversableLike.scala:234)
    at scala.collection.mutable.ArrayOps$ofRef.map(ArrayOps.scala:186)
    at org.apache.spark.sql.execution.datasources.jdbc.JdbcUtils$.saveTable(JdbcUtils.scala:292)
    at org.apache.spark.sql.DataFrameWriter.jdbc(DataFrameWriter.scala:441)
    at scala.Function0$class.apply$mcV$sp(Function0.scala:34)
    at scala.runtime.AbstractFunction0.apply$mcV$sp(AbstractFunction0.scala:12)
    at scala.App$$anonfun$main$1.apply(App.scala:76)
    at scala.App$$anonfun$main$1.apply(App.scala:76)
    at scala.collection.immutable.List.foreach(List.scala:381)
    at scala.collection.generic.TraversableForwarder$class.foreach(TraversableForwarder.scala:35)
    at scala.App$class.main(App.scala:76)
    at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
    at sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62)
    at sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
    at java.lang.reflect.Method.invoke(Method.java:498)
    at org.apache.spark.deploy.SparkSubmit$.org$apache$spark$deploy$SparkSubmit$$runMain(SparkSubmit.scala:736)
    at org.apache.spark.deploy.SparkSubmit$.doRunMain$1(SparkSubmit.scala:185)
    at org.apache.spark.deploy.SparkSubmit$.submit(SparkSubmit.scala:210)
    at org.apache.spark.deploy.SparkSubmit$.main(SparkSubmit.scala:124)
    at org.apache.spark.deploy.SparkSubmit.main(SparkSubmit.scala)

什么是让Spark正确写入有效的 hstore 数据类型的方法?
2个回答

13

原来我只需要让Postgres尝试猜测我的列的适当类型。 如官方文档中所述,在连接字符串中将 stringtype 设置为 unspecified

props.put("stringtype", "unspecified")

现在它完美地工作了!!


1
这对我非常有帮助!您为我节省了大量时间,而且这是我在该主题上找到的唯一信息。话虽如此,我还发现一个关键点:您要写入的hstore列必须已经存在。如果Spark使用的SaveMode设置为“overwrite”,那么Postgres就没有机会尝试将文本解析为hstore列;Spark只会告诉Postgres它是一个text列。 - mtrewartha
2
只是为了澄清,在预先创建了一个带有uuid列数据类型的表上使用SaveMode.Append非常重要。如果您尝试运行SaveMode.Overwrite,Spark将不会创建带有uuid列类型的表(Spark 2.3.0将失败并显示:DataType uuid is not supported. - ecoe
太好了。这对我有用,而且节省了很多时间。 - Nikunj Kakadiya

2
这是一段用于将带有HSTORE JSON和JSONB列的DataFrame写入PostgreSQL表格的pyspark代码。通常,对于在PostgreSQL中创建的任何复杂数据类型,无法在Spark DataFrame中创建,您需要在选项或正在设置为任何写入数据框架到SQL函数的属性中指定stringtype="unspecified"
下面是使用write()函数将Spark Dataframe写入PostgreSQL表格的示例:
dataframe.write.format('jdbc').options(driver=driver,user=username,password=password, url=target_database_url,dbtable=table, stringtype="unspecified").mode("append").save()

网页内容由stack overflow 提供, 点击上面的
可以查看英文原文,
原文链接