在Pyspark中将Pandas数据框转换为Hive

10

如何将Pandas数据框发送到Hive表?

我知道如果有一个Spark数据帧,可以使用以下方法将其注册到临时表:

df.registerTempTable("table_name")
sqlContext.sql("create table table_name2 as select * from table_name")

但是当我尝试使用pandas DataFrame注册TempTable时,会出现以下错误:

AttributeError: 'DataFrame' object has no attribute 'registerTempTable'

我是否可以使用pandas DataFrame来注册临时表,或者将其转换为Spark DataFrame并使用它来注册临时表,以便我可以将其发送回Hive?


需要转换,错误很明显,Pandas数据帧中不存在该属性。 - EdChum
是的,但我不知道如何将pandas dataframe转换为spark dataframe。 我尝试使用 sqlContext.createDataFrame(df) 但它只接受模式而不是数据框中的数据。 - thenakulchawla
4个回答

5

我猜你正在尝试使用pandas的df,而不是Spark的DF

Pandas DataFrame没有registerTempTable方法。

您可以尝试从pandas DF创建Spark DF。

更新:

我在Cloudera下进行了测试(安装了Anaconda parcel,其中包括Pandas模块)。

请确保在所有的Spark worker上将PYSPARK_PYTHON设置为您的anaconda python安装路径(或另一个含有Pandas模块的路径)(通常在:spark-conf/spark-env.sh

这是我的测试结果:

>>> import pandas as pd
>>> import numpy as np
>>> df = pd.DataFrame(np.random.randint(0,100,size=(10, 3)), columns=list('ABC'))
>>> sdf = sqlContext.createDataFrame(df)
>>> sdf.show()
+---+---+---+
|  A|  B|  C|
+---+---+---+
| 98| 33| 75|
| 91| 57| 80|
| 20| 87| 85|
| 20| 61| 37|
| 96| 64| 60|
| 79| 45| 82|
| 82| 16| 22|
| 77| 34| 65|
| 74| 18| 17|
| 71| 57| 60|
+---+---+---+

>>> sdf.printSchema()
root
 |-- A: long (nullable = true)
 |-- B: long (nullable = true)
 |-- C: long (nullable = true)

有没有办法将pandas转换为Spark DF?我可以尝试在Spark df中编写所有功能,但它的灵活性不如pandas。 - thenakulchawla
是的,它只创建模式但不带数据。 一旦您尝试了,请告诉我它对您的工作效果如何。 - thenakulchawla
如果您知道另一种方法可以将数据发送回Hive而不使用临时表,请分享该方法。只要能在Hive中取回数据,我就不想使用临时表。 - thenakulchawla
好的,如果你稍后尝试,请告诉我。我仍在努力弄清楚这个问题,如果我无法解决它,那么我将尝试使用Scala来完成我的工作。 - thenakulchawla
我现在会尝试一下,看看能否让它正常工作。 - thenakulchawla
显示剩余2条评论

4

首先,您需要将Pandas数据框转换为Spark数据框:

from pyspark.sql import HiveContext
hive_context = HiveContext(sc)
df = hive_context.createDataFrame(pd_df)

接下来,您可以创建一个位于内存中的临时表:

df.registerTempTable('tmp')

现在,您可以使用Hive QL将数据保存到Hive中:

hive_context.sql("""insert overwrite table target partition(p='p') select a,b from tmp'''

请注意:hive_context 必须保持相同!

0
我将我的pandas数据框转换为临时表,方法如下:
1)将pandas数据框转换为Spark数据框:
spark_df=sqlContext.createDataFrame(Pandas_df)

2) 确保数据被正确地迁移

spark_df.select("*").show()

3) 将Spark DataFrame 转换为临时表以进行查询。

spark_df.registerTempTable("table_name").

干杯!


0

通过遵循这里的所有其他答案,我成功地将pandas数据帧转换为永久Hive表,如下所示:

# sc is a spark context created with enableHiveSupport()
from pyspark.sql import HiveContext
hc=HiveContext(sc)

# df is my pandas dataframe
sc.createDataFrame(df).registerTempTable('tmp')   

# sch is the hive schema, and tabname is my new hive table name
hc.sql("create table sch.tabname as select * from tmp") 


网页内容由stack overflow 提供, 点击上面的
可以查看英文原文,
原文链接