sqlContext HiveDriver错误,出现SQLException: Method not supported。

22
我一直尝试使用sqlContext.read.format("jdbc").options(driver="org.apache.hive.jdbc.HiveDriver")将Hive表格导入到Spark,但没有成功。我已经进行了研究并阅读了以下内容:如何从Spark连接到远程Hive服务器Spark 1.5.1不能与hive jdbc 1.2.0一起工作http://belablotski.blogspot.in/2016/01/access-hive-tables-from-spark-using.html。我使用了最新的Hortonworks Sandbox 2.6,并在社区中问了同样的问题:https://community.hortonworks.com/questions/156828/pyspark-jdbc-py4jjavaerror-calling-o95load-javasql.html?childToView=156936#answer-156936。我想通过pyspark实现非常简单的操作:
df = sqlContext.read.format("jdbc").options(driver="org.apache.hive.jdbc.HiveDriver", url="jdbc:hive2://localhost:10016/default", dbtable="sample_07",user="maria_dev", password="maria_dev").load()

这给了我以下错误:

17/12/30 19:55:14 INFO HiveConnection: Will try to open client transport with JDBC Uri: jdbc:hive2://localhost:10016/default
Traceback (most recent call last):
  File "<stdin>", line 1, in <module>
  File "/usr/hdp/current/spark-client/python/pyspark/sql/readwriter.py", line 139, in load
    return self._df(self._jreader.load())
  File "/usr/hdp/current/spark-client/python/lib/py4j-0.9-src.zip/py4j/java_gateway.py", line 813, in __call__
  File "/usr/hdp/current/spark-client/python/pyspark/sql/utils.py", line 45, in deco
    return f(*a, **kw)
  File "/usr/hdp/current/spark-client/python/lib/py4j-0.9-src.zip/py4j/protocol.py", line 308, in get_return_value
py4j.protocol.Py4JJavaError: An error occurred while calling o119.load.
: java.sql.SQLException: Method not supported
at org.apache.hive.jdbc.HiveResultSetMetaData.isSigned(HiveResultSetMetaData.java:143)
at org.apache.spark.sql.execution.datasources.jdbc.JDBCRDD$.resolveTable(JDBCRDD.scala:136)
at org.apache.spark.sql.execution.datasources.jdbc.JDBCRelation.<init>(JDBCRelation.scala:91)
at org.apache.spark.sql.execution.datasources.jdbc.DefaultSource.createRelation(DefaultSource.scala:57)
at org.apache.spark.sql.execution.datasources.ResolvedDataSource$.apply(ResolvedDataSource.scala:158)
at org.apache.spark.sql.DataFrameReader.load(DataFrameReader.scala:119)
at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
at sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62)
at sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
at java.lang.reflect.Method.invoke(Method.java:498)
at py4j.reflection.MethodInvoker.invoke(MethodInvoker.java:231)
at py4j.reflection.ReflectionEngine.invoke(ReflectionEngine.java:381)
at py4j.Gateway.invoke(Gateway.java:259)
at py4j.commands.AbstractCommand.invokeMethod(AbstractCommand.java:133)
at py4j.commands.CallCommand.execute(CallCommand.java:79)
at py4j.GatewayConnection.run(GatewayConnection.java:209)
at java.lang.Thread.run(Thread.java:748)

使用beeline,它可以正常工作。
beeline> !connect jdbc:hive2://localhost:10016/default maria_dev maria_dev
Connecting to jdbc:hive2://localhost:10016/default
Connected to: Spark SQL (version 2.1.1.2.6.1.0-129)
Driver: Hive JDBC (version 1.2.1000.2.6.1.0-129)
Transaction isolation: TRANSACTION_REPEATABLE_READ
0: jdbc:hive2://localhost:10016/default> select * from sample_07 limit 2;
+----------+-------------------------+------------+---------+--+
|   code   |       description       | total_emp  | salary  |
+----------+-------------------------+------------+---------+--+
| 00-0000  | All Occupations         | 134354250  | 40690   |
| 11-0000  | Management occupations  | 6003930    | 96150   |
+----------+-------------------------+------------+---------+--+

我也可以这样做:
spark = SparkSession.Builder().appName("testapp").enableHiveSupport().‌​getOrCreate()
spark.sql("select * from default.sample_07").collect()

但这样会直接读取Hive元数据。我想使用JDBC连接到Spark Thrift Server以实现细粒度的安全控制。
我可以这样使用PostgreSQL:
sqlContext.read.format("jdbc").options(driver="org.postgresql.Driver")

我也可以使用Scala java.sql.{DriverManager, Connection, Statement, ResultSet} 来创建JDBC连接,以作为客户端访问Spark。但这基本上会将所有的数据加载到内存中,然后手动重新创建Dataframe。
因此问题是:是否有一种方法可以在不像Scala那样将数据加载到内存中的JDBC客户端中创建Spark dataframe,并且不使用像上述示例中的SparkSession.Builder()?我的用例需要处理细粒度安全性。

1
不使用Hortonworks的人 - 可以包括Spark版本吗? - Alper t. Turker
Beeline只安装在本地了吗?即使它安装在本地,你可以通过给它的IP地址来尝试连接。据我所知,Beeline可能不会通过localhost进行连接。请尝试一次,让我知道它是否有效。也许在yarn客户端模式下,它没有选择local host。 - Manu Gupta
这是Spark 2.2。但坦白地说,这并不重要。如果最新版本能够正常工作,我可以更改版本。我尝试了localhost和FQDN,结果相同... - HP.
@HP。抱歉回复晚了。随时欢迎联系我聊天。 - loneStar
@Achyuth 哪个解决方案? - HP.
显示剩余3条评论
2个回答

2

实际上我已经调查过了。Hotornworks和Cloudera停止了通过Thrift Server从Spark连接到Hive的支持。

所以你正在做一些不可能的事情。

https://www.cloudera.com/documentation/spark2/latest/topics/spark2_known_issues.html#ki_thrift_server

链接说Thrift被禁用,但它是特别针对从Spark连接到Hive的。除了Hive之外,我能够从Spark连接到所有类型的数据库。

因此,您必须使用不同的授权方式。

由于Spark对象直接连接到Hive,他们正在删除Thrift支持。

根据您之前的问题,它能够读取数据,但读取错误的数据。 Spark 2.2 Thrift server error on dataframe NumberFormatException when query Hive table

代码

>>> df = sqlContext.read.format("jdbc").options(driver="org.apache.hive.jdbc.HiveDriver", url="jdbc:hive2://localhost:10016/default", dbtable="test4",user="hive", password="hive").option("fetchsize", "10").load()
>>> df.select("*").show()
+---+----+
| id|desc|
+---+----+
| id|desc|
| id|desc|
+---+----+

这里的问题在于Hive中默认的方言使用双引号引用标识符。像SELECT “dw_date” FROM table…这样的SQL查询将被解析为选择字符串文字,而不是名为“dw_date”的列。通过用反引号替换引号,似乎问题得到了解决。然而,在我的测试中,从Hive获取的列名都带有前缀,如table.dw_date。但是您不能直接将反引号包装在其中,例如table.dw_date。相反,我们需要逐个包装每个部分。
import org.apache.spark.sql.jdbc.JdbcDialect
    private case object HiveDialect extends JdbcDialect {
      override def canHandle(url : String): Boolean = url.startsWith("jdbc:hive2")
      override def quoteIdentifier(colName: String): String = {
        colName.split(‘.’).map(part => s”`$part`”).mkString(“.”)
      }
    }

请按照下面的帖子实施解决方案。

https://medium.com/@viirya/custom-jdbc-dialect-for-hive-5dbb694cc2bd

https://medium.com/@huaxing/customize-spark-jdbc-data-source-to-work-with-your-dedicated-database-dialect-beec6519af27

注册方言。
JdbcDialects.registerDialect(HiveDialect)

然后Hive JDBC就可以工作了。

这对于Cloudera来说是正确的,但我怀疑对于Hortonworks来说是否也是如此 https://docs.hortonworks.com/HDPDocuments/HDP2/HDP-2.6.3/bk_spark-component-guide/content/config-sts.html,除非你有公司内部可靠的消息来源。不管怎样,困扰我的是他们在文档中没有明确说明不支持Spark到Thrift JDBC。如果他们说了,我会很高兴并继续前进... :) - HP.
嗨,当我运行上述代码时,在spark-shell中出现了“错误:找不到类型JdbcDialect”的错误,请问您能帮我修复这个问题吗? - Gowtham SB
@GowthamSB 包含了导入 - loneStar

2

我不确定我是否正确理解了你的问题,但是从我的理解来看,您需要将一个Hive表格转换为数据框。为此,您不需要JDBC连接。在您提供的示例链接中,他们试图连接到不同的数据库(关系型数据库),而不是Hive。

请参考下面的方法,使用Hive上下文可以将表格转换为数据框。

import org.apache.spark.SparkConf
import org.apache.spark.SparkContext
import org.apache.spark.sql.{DataFrame, SQLContext}

def main(args: Array[String]): Unit = {

val sparkConf = new SparkConf().setAppName("APPName")
    val sc = new SparkContext(sparkConf)
    val hiveContext = new org.apache.spark.sql.hive.HiveContext(sc)
    val sqlContext = new SQLContext(sc)

val hive_df = hiveContext.sql("select * from schema.table").first()

//other way
// val hive_df= hiveContext.table ("SchemaName.TableName")

//Below will print the first line
df.first()
//count on dataframe
df.count()

}

如果您确实想要使用JDBC连接,我有以下示例可供参考,其中包括我用于Oracle数据库的示例,这可能会对您有所帮助。
val oracle_data = sqlContext.load("jdbc", Map("url" -> "jdbc:oracle:thin:username/password//hostname:2134/databaseName", "dbtable" -> "Your query tmp", "driver" -> "oracle.jdbc.driver.OracleDriver"));

1
我了解你上面的例子。基本上是使用Spark直接从Hive Metastore获取数据。我的要求是使用带有用户/密码的JDBC连接到Spark Thrift Server,因为系统可能具有LDAP或Active Directory身份验证,以实现细粒度安全性(行/列过滤和掩码)。直接进入Metastore可以工作,但那只是一个用户访问一个集群。 - HP.
我有多个用户,所以需要传递凭据。我考虑使用Kerberos来保护Hive Metastore,但这不能给我细粒度的安全性(只能在表级别上拒绝或允许访问,而不能在行/列级别上)。 - HP.

网页内容由stack overflow 提供, 点击上面的
可以查看英文原文,
原文链接