用Python编程以编程方式启动HiveThriftServer

3
在spark-shell(scala)中,我们导入org.apache.spark.sql.hive.thriftserver._,用于以编程方式启动Hive Thrift服务器程序,为特定的hive上下文启动HiveThriftServer2.startWithContext(hiveContext)以公开该特定会话的注册临时表。
那么如何在Python中使用相同的方法呢?是否有一个Python包/API可导入HiveThriftServer?欢迎提出任何其他想法/建议。
我们已经使用pyspark创建了一个数据框。
谢谢!
Ravi Narayanan

为什么需要一个Thrift服务器既然它只是临时表?难道你不能创建自己的HiveContext来连接本地临时创建的元数据存储吗? - user1314742
顺便问一下,你为什么需要从你的代码中启动它? - user1314742
1
如果我们将Thrift服务器作为守护进程启动,我们将无法查看临时表(会话与启动HiveContext的会话不同,并且临时表仅适用于特定会话)。 - Ravi Narayanan
你搞清楚怎么做了吗? - user1158559
@user1158559,你解决这个问题了吗? - stackit
显示剩余2条评论
2个回答

5
你可以使用py4j java网关导入它。以下代码适用于Spark 2.0.2,可以通过Beeline查询在Python脚本中注册的临时表。
from py4j.java_gateway import java_import
java_import(sc._gateway.jvm,"")

spark = SparkSession \
        .builder \
        .appName(app_name) \
        .master(master)\
        .enableHiveSupport()\
        .config('spark.sql.hive.thriftServer.singleSession', True)\
        .getOrCreate()
sc=spark.sparkContext
sc.setLogLevel('INFO')

#Start the Thrift Server using the jvm and passing the same spark session corresponding to pyspark session in the jvm side.
sc._gateway.jvm.org.apache.spark.sql.hive.thriftserver.HiveThriftServer2.startWithContext(spark._jwrapped)

spark.sql('CREATE TABLE myTable')
data_file="path to csv file with data"
dataframe = spark.read.option("header","true").csv(data_file).cache()
dataframe.createOrReplaceTempView("myTempView")

然后前往Beeline检查是否已正确启动:

in terminal> $SPARK_HOME/bin/beeline
beeline> !connect jdbc:hive2://localhost:10000
beeline> show tables;

它应该显示在Python中创建的表和临时表/视图,包括"myTable"和"myTempView"。为了看到临时视图,必须具有相同的Spark会话(请参见答案:避免以编程方式创建上下文启动HiveThriftServer2)。注意:即使从终端启动Thrift服务器并连接到相同的元存储,也可以访问Hive表,但无法访问临时视图,因为它们位于Spark会话中而不是写入元存储中。)

0
对于Spark 3,以下内容适用:
from py4j.java_gateway import java_import
from pyspark.sql import SparkSession

spark = SparkSession.builder.enableHiveSupport().getOrCreate()
sc = spark.sparkContext

java_import(sc._jvm, "org.apache.spark.sql.hive.thriftserver.HiveThriftServer2")
args = sys.argv[1:]
java_args = sc._gateway.new_array(sc._gateway.jvm.java.lang.String, len(args))

for i, arg in enumerate(args):
    java_args[i] = arg
sc._jvm.org.apache.spark.sql.hive.thriftserver.HiveThriftServer2.main(java_args)

请注意,HiveThriftServer2类的main方法调用了startWithContext方法。(请参考这里的源代码)

网页内容由stack overflow 提供, 点击上面的
可以查看英文原文,
原文链接