读取 Parquet 文件时刷新 DataFrame 的元数据

4
我想读取一个parquet文件作为dataframe,这个文件会周期性地更新(路径是/folder_name)。每当有新数据时,旧的parquet文件路径(/folder_name)将被重命名为临时路径,然后我们将合并新数据和旧数据并存储在旧路径(/folder_name)中。
发生的情况是,假设我们有一个parquet文件作为更新前的hdfs://folder_name/part-xxxx-xxx.snappy.parquet,更新后它变成了hdfs://folder_name/part-00000-yyyy-yyy.snappy.parquet 问题出现在我试图读取parquet文件时进行更新
sparksession.read.parquet("filename") => 它获取旧路径hdfs://folder_name/part-xxxx-xxx.snappy.parquet (路径存在)
当对dataframe调用操作时,它尝试从hdfs://folder_name/part-xxxx-xxx.snappy.parquet读取数据,但由于更新文件名已更改,因此我遇到了以下问题
java.io.FileNotFoundException: File does not exist: hdfs://folder_name/part-xxxx-xxx.snappy.parquet It is possible the underlying files have been updated. You can explicitly invalidate the cache in Spark by running 'REFRESH TABLE tableName' command in SQL or by recreating the Dataset/DataFrame involved.
我正在使用Spark 2.2
是否有人可以帮助我如何刷新元数据?

我在ipython内核(jupyter笔记本)中遇到了这个“FileNotFoundException”错误,错误信息是“...snappy.parquet”。通过重新启动内核解决了这个问题。 - Wassadamo
5个回答

4

当你尝试读取一个不存在的文件时,就会出现这个错误。

如果我没理解错的话,我怀疑你在保存新数据帧时(使用.mode("overwrite")),覆盖了所有文件。在此过程中,你试图读取已被删除的文件,会抛出异常 - 这使得表在更新期间不可用。

据我所知,没有直接刷新元数据的方法。

以下是解决此问题的两种方法之一(多种可能性):

1 - 使用追加模式

如果你只想将新的数据帧附加到旧的数据帧上,那么无需创建临时文件夹并覆盖旧文件。你可以将保存模式从"覆盖"更改为"追加"。这样,你就可以向现有的Parquet文件添加分区,而无需重写现有分区。

df.write
  .mode("append")
  .parquet("/temp_table")

这是迄今为止最简单的解决方案,不需要读取已经存储的数据。然而,如果您需要更新旧数据(例如:如果您正在进行upsert操作),则此方法无法奏效。对于这种情况,您可以选择第二种方案:
2 - 使用Hive视图
您可以创建Hive表,并使用视图指向最新(并且可用的)表格。
以下是这种方法背后的逻辑示例:
第一部分:
- 如果视图不存在,则创建一个名为_alpha0的新表来存储新数据。 - 创建表之后,我们将创建一个视图,如下所示:select * from _alpha0
第二部分:
- 如果视图存在,则需要查看它指向哪个表格<(table_name>_alphaN)> - 进行所有想要执行的操作,并将其保存为名为_alpha(N+1)的表格 - 创建表之后,我们将更改视图以便选择* from_alpha(N+1)
以下是代码示例:
import org.apache.spark.sql.{DataFrame, Row, SparkSession}
import org.apache.spark.sql.types._
import spark.implicits._


//This method verifies if the view exists and returns the table it is pointing to (using the query 'describe formatted')

def getCurrentTable(spark: SparkSession, databaseName:String, tableName: String): Option[String] = {
  if(spark.catalog.tableExists(s"${databaseName}.${tableName}")) {

    val rdd_desc = spark.sql(s"describe formatted ${databaseName}.${tableName}")
      .filter("col_name == 'View Text'")
      .rdd

    if(rdd_desc.isEmpty()) {
      None
    }
    else {
      Option(
        rdd_desc.first()
          .get(1)
          .toString
          .toLowerCase
          .stripPrefix("select * from ")
      )
    }
  }
  else
    None
}

//This method saves a dataframe in the next "alpha table" and updates the view. It maintains 'rounds' tables (default=3). I.e. if the current table is alpha2, the next one will be alpha0 again.

def saveDataframe(spark: SparkSession, databaseName:String, tableName: String, new_df: DataFrame, rounds: Int = 3): Unit ={
  val currentTable = getCurrentTable(spark, databaseName, tableName).getOrElse(s"${databaseName}.${tableName}_alpha${rounds-1}")
  val nextAlphaTable = currentTable.replace(s"_alpha${currentTable.last}",s"_alpha${(currentTable.last.toInt + 1) % rounds}")

  new_df.write
    .mode("overwrite")
    .format("parquet")
    .option("compression","snappy")
    .saveAsTable(nextAlphaTable)

  spark.sql(s"create or replace view ${databaseName}.${tableName} as select * from ${nextAlphaTable}")
}

//An example on how to use this:

//SparkSession: spark
val df = Seq((1,"I"),(2,"am"),(3,"a"),(4,"dataframe")).toDF("id","text")
val new_data = Seq((5,"with"),(6,"new"),(7,"data")).toDF("id","text")
val dbName = "test_db"
val tableName = "alpha_test_table"

println(s"Current table: ${getCurrentTable(spark, dbName, tableName).getOrElse("Table does not exist")}")
println("Saving dataframe")

saveDataframe(spark, dbName, tableName, df)

println("Dataframe saved")
println(s"Current table: ${getCurrentTable(spark, dbName, tableName).getOrElse("Table does not exist")}")
spark.read.table(s"${dbName}.${tableName}").show

val processed_df = df.unionByName(new_data) //Or other operations you want to do

println("Saving new dataframe")
saveDataframe(spark, dbName, tableName, processed_df)

println("Dataframe saved")
println(s"Current table: ${getCurrentTable(spark, dbName, tableName).getOrElse("Table does not exist")}")
spark.read.table(s"${dbName}.${tableName}").show

结果:

Current table: Table does not exist
Saving dataframe
Dataframe saved
Current table: test_db.alpha_test_table_alpha0
+---+---------+
| id|     text|
+---+---------+
|  3|        a|
|  4|dataframe|
|  1|        I|
|  2|       am|
+---+---------+

Saving new dataframe
Dataframe saved
Current table: test_db.alpha_test_table_alpha1
+---+---------+
| id|     text|
+---+---------+
|  3|        a|
|  4|dataframe|
|  5|     with|
|  6|      new|
|  7|     data|
|  1|        I|
|  2|       am|
+---+---------+

通过这样做,您可以保证视图 <table_name> 的一个版本始终可用。这也有一个优点(或不是,取决于您的情况),即维护表格的先前版本。即 <table_name_alpha1> 的上一个版本将是 <table_name_alpha0>

3 - 奖励

如果升级您的Spark版本是一个选项,可以查看 Delta Lake(最低Spark版本:2.4.2)。

希望这有所帮助 :)


我们还没有集成Hive。因此,使用追加模式会创建许多文件。有什么选项吗? - wazza
你尝试过在追加文件之前使用合并函数来减少文件数量吗? - André Machado

3

先缓存parquet文件,然后执行覆盖操作。

var tmp = sparkSession.read.parquet("path/to/parquet_1").cache()
tmp.write.mode(SaveMode.Overwrite).parquet("path/to/parquet_1") // same path

因为Spark采用了惰性计算,所以会抛出错误。当DAG执行"write"命令时,它开始同时读取Parquet并写入/覆盖。


2

Spark没有像Zookeeper一样的事务管理器来对文件进行锁定,因此进行并发读/写是一个需要单独处理的挑战。

要刷新目录,您可以执行以下操作:

spark.catalog.refreshTable("my_table")

OR

spark.sql(s"REFRESH TABLE $tableName")

我没有使用Hive,因此我没有任何表名。针对这种情况该怎么办? - wazza
你正在运行独立的Spark还是与Hive Metastore集成?如果已经集成,那么这个选项将起作用。 - Jayadeep Jayaraman
不,仅在Yarn中使用Spark。我们尚未集成Hive。 - wazza
正如我所提到的,Spark 没有事务管理器。只有两个选择:1)使用最近作为开源项目发布的 Delta Lake;2)集成 Hive 以进行所有元数据操作,这与使用 Hive 不同。最后一个选项是将新文件附加到文件夹位置中并创建一个视图来仅获取最新记录,从而避免 1) 和 2) 的复杂性。 - Jayadeep Jayaraman

1
  1. 一个简单的解决方案是使用 df.cache.count 先将数据缓存到内存中,然后与新数据进行合并,并以 overwrite 模式写入到 /folder_name。在这种情况下,您不需要使用 temp 路径。

  2. 您提到正在将 /folder_name 重命名为某个临时路径。因此,您应该从该临时路径读取旧数据,而不是从 hdfs://folder_name/part-xxxx-xxx.snappy.parquet 读取。


0

示例

从阅读您的问题,我认为如果是这样,您应该能够在不使用DeltaLake的情况下运行代码。在下面的用例中,Spark将按如下方式运行代码:(1)加载inputDF并将文件夹位置的文件名存储在本地[在此情况下为显式部分文件名];(2a)到达第2行并覆盖tempLocation中的文件;(2b)从inputDF加载内容并将其输出到tempLocation;(3)按照与1相同的步骤但在tempLocation上执行;(4a)删除inputLocation文件夹中的文件;和(4b)尝试从1中缓存的part文件加载数据以从inputDF运行union并因文件不存在而中断。

val inputDF = spark.read.format("parquet").load(inputLocation)
inputDF.write.format("parquet").mode("overwrite").save(tempLocation)

val tempDF = spark.read.foramt("parquet").load(tempLocation)

val outputDF = inputDF.unionAll(tempDF)
outputDF.write.format("parquet").mode("overwrite").save(inputLocation)

从我的经验来看,你可以选择两种方式,即持久性或临时输出用于覆盖的所有内容。
持久性:
在下面的使用案例中,我们将加载inputDF并立即将其另存为另一个元素并将其持久化。在执行操作时,持久化将应用于数据而不是文件夹中的文件路径。
否则,你可以对outputDF进行持久化,这将具有相同的效果。由于持久化与数据而非文件路径相关联,输入的销毁不会导致覆盖期间文件路径丢失。
val inputDF = spark.read.format("parquet").load(inputLocation) 

val inputDF2 = inputDF.persist
inputDF2.count

inputDF2.write.format("parquet").mode("overwrite").save(tempLocation)

val tempDF = spark.read.foramt("parquet").load(tempLocation)

val outputDF = inputDF2.unionAll(tempDF) outputDF.write.format("parquet").mode("overwrite").save(inputLocation)

临时加载

如果您不是加载联合输入的临时输出,而是将outputDF完全加载到临时文件中,并重新加载该文件以获取输出,则不应看到文件未找到错误。


网页内容由stack overflow 提供, 点击上面的
可以查看英文原文,
原文链接