持久化Spark Dataframe

4

我刚接触Spark。我们如何将Dataframe持久化,以便我们可以在组件之间使用它。

我有一个Kafka流,通过Rdd生成Dataframe。尝试注册为临时表,但另一个程序中无法访问该表。

我想通过sqlContext在另一个类中访问此Dataframe,并使用查询结果进行进一步的计算。

2个回答

2
DataFrames也可以使用saveAsTable命令保存为Hive metastore中的持久表。请注意,使用此功能不需要现有的Hive部署。Spark将为您创建一个默认的本地Hive metastore(使用Derby)。与createOrReplaceTempView命令不同,saveAsTable将实现DataFrame的内容并在Hive metastore中创建指向数据的指针。
只要保持连接到同一metastore,即使您的Spark程序重新启动,持久表仍将存在。可以通过在SparkSession上调用table方法并提供表名来创建持久表的DataFrame。
默认情况下,saveAsTable将创建一个“管理表”,这意味着数据的位置将受到metastore的控制。当删除表时,管理表的数据也将自动删除。

我也尝试了使用 SaveAsTable("tablename") 方法。但是,当我尝试通过 sqlContext 在另一个程序中获取该表时,仍然出现了“找不到表”的异常。 - Bindumalini KK
我在讨论中了解到,使用HiveContext而不是SQLContext将持久化表格,可以在组件之间访问。所以我创建了HiveContect,并使用hiveContext.saveTable("table")。我对Hive不是很了解。如果能指导我如何配置元数据存储,将是一个很大的帮助。 - Bindumalini KK
你已经安装了Hive吗?如果已经存在,你应该已经有元数据存储了。 - Arunakiran Nulu
不是安装的问题。只是在使用这个工件:spark-hivecontext-compatibility_2.10。 - Bindumalini KK
根据我之前的帖子,你不需要安装Hive,Spark会为你创建本地Hive元存储,使用saveAsTable并尝试再次访问,如果有任何问题,请告诉我。 - Arunakiran Nulu
它抛出编译错误,说必须使用sqlContext来存储“临时”表。但是当我使用HiveContext时,我再次收到错误,因为它已被弃用。 - Bindumalini KK

2

您可以将DataFrame的内容保存为Parquet文件,并在另一个程序中读取相同的内容。您可以在下一个程序中注册为Temp表。Spark SQL支持读写Parquet文件,自动保留原始数据的模式。

//First Program
dataframe.write.format("parquet").save("/tmp/xyz-dir/card.parquet")
//where /tmp/xyz-dir/ is a HDFS directory

//Second Program
val parquetRead = sqlContext.read.format("parquet").load("/tmp/xyz-dir/card.parquet")

//Parquet files can also be registered as tables and then used in SQL statements.
parquetRead.registerTempTable("parquettemptable")
val cust= sqlContext.sql("SELECT name FROM parquettemptable")

//After use of parquet file, delete the same in the second program
val fs = org.apache.hadoop.fs.FileSystem.get(new java.net.URI("hdfs://hostname:8030"), sc.hadoopConfiguration)
fs.delete(new org.apache.hadoop.fs.Path("/tmp/xyz-dir"),true) // isRecusrive= true

1
@BindumaliniKK saveAsParquetFile 已弃用(自版本1.4.0起)。请使用 write.parquet(path) 请参阅1.3方法,Arvind请相应修改。 - Ram Ghadiyaram
我遇到了异常:INFO org.apache.spark.sql.execution.datasources.parquet.ParquetRelation - 在驱动程序上列出文件:/SparkSpace/PaymentCardFraudResearch/card.parquet Exception in thread "main" java.lang.AssertionError: 断言失败:在文件:/SparkSpace/PaymentCardFraudResearch/card.parquet下找不到预定义的模式,也没有Parquet数据文件或摘要文件。 at scala.Predef$.assert(Predef.scala:179) - Bindumalini KK
写作为:cdrDF.write.parquet("card.parquet") 读取为:val parquetRead = sqlContext.read.parquet("card.parquet") - Bindumalini KK
@ArvindKumar:我们需要添加任何依赖项吗:因为我遇到了一个异常:Caused by: java.lang.ClassNotFoundException: org.apache.spark.sql.execution.datasources.FileFormat。我正在按照更新后的代码片段进行操作。 - Bindumalini KK
请检查您的pom.xml文件,参考https://github.com/apache/spark/blob/master/sql/core/pom.xml。将Scala透视图添加到您的项目文件中。 - Arvind Kumar
显示剩余3条评论

网页内容由stack overflow 提供, 点击上面的
可以查看英文原文,
原文链接