SparkSQL和UDT

4

我尝试使用SparkSQL(v.1.3.0)访问PostgreSQL数据库。在这个数据库中,我有一个表。

CREATE TABLE test (
 id bigint,
 values double precision[]
);

访问表格时,我使用

val sparkConf = new SparkConf().setAppName("TestRead").setMaster("local[2]")
val sc = new SparkContext(sparkConf)
val sqlContext = new SQLContext(sc)

val jdbcDF = sqlContext.load("jdbc", Map(
  "url" -> "jdbc:postgresql://...",
  "dbtable" -> "schema.test",
  "user" -> "...",
  "password" -> "..."))

sqlContext.sql("SELECT * FROM schema.test")

然而,每次我尝试访问包含此数组的表时,我都会收到一个“java.sql.SQLException:Unsupported type 2003”的错误消息。
我在Spark测试代码中找到了一个示例,用于为二维点创建Spark UDT(请参见ExamplePointUDT.scala)。但是,我不明白我如何可能使用这段代码。

1
当我今天研究SparkSQL UDT时,发现它还不是一个稳定的公共API,参见邮件列表源代码注释 - Randall Whitman
1
即使通过Spark使用JDBC访问Hive2服务器,如下所示hive.load("jdbc", Map( "url" -> "jdbc:hive2://ip:port/;auth=noSasl", "driver" -> "org.apache.hive.jdbc.HiveDriver", "dbtable" -> "default.weeks", "user" -> "user", "password" -> "" )),我仍然会遇到此错误。 - Prikso NAI
1个回答

0

这可以通过在查询中进行转换来实现,至少在pyspark中是如此。

不要让不支持的类型到达spark,将它们转换为您的数据库中,然后在获取表格后再将它们转换回来。

我不确定语法是否正确,但大致应该是这样:

val query_table = "(SELECT id, CAST(values AS TEXT) FROM schema.test) AS casted_table"

val jdbcDF = sqlContext.load("jdbc", Map(
  "url" -> "jdbc:postgresql://...",
  "dbtable" -> query_table,
  "user" -> "...",
  "password" -> "..."))

jdbcDF.map(x => (x.id, x.values.toArray))

我很确定没有 .toArray 可以将字符串表示转换回数组,它只是占位符代码.. 但现在重点在于正确地解析它。

当然,这只是一个补丁,但它可以工作。


网页内容由stack overflow 提供, 点击上面的
可以查看英文原文,
原文链接