更新Apache Parquet文件中的值。

22

我有一个相当庞大的Parquet文件,需要更改其中一列的值。一种方法是在源文本文件中更新这些值并重新创建Parquet文件,但我想知道是否有更便宜、更容易的解决方案。


3
不行,你需要重新创建文件。 - Dan Osipov
@DanOsipov 谢谢。我认为这种限制是由于使用了各种压缩算法,使得更新列值变得不容易甚至不可能。 - marcin_koss
1
我认为这是一个更基本的问题,而不仅仅是一个parquet特定的问题。在高数据量的世界中,parquet被广泛使用,不可变性是您需要关心的事情。从这个角度来看,您希望加载数据,转换数据,然后再次写入数据。您可以考虑只写入所需的列,因为它是一种按列格式,所以更有效率。 - Fokko Driesprong
1
我理解您想要更新先前运行中已写入的字段。也许这篇文章可以帮助您。我不是在推广任何产品,请专注于所涉及的概念,而不是广告宣传的产品。https://databricks.com/blog/2018/10/29/simplifying-change-data-capture-with-databricks-delta.html - Richard Gomes
4个回答

18

让我们从基础知识开始:

Parquet是一种需要保存在文件系统中的文件格式。

关键问题:

  1. Parquet是否支持追加操作?
  2. 文件系统(即HDFS)是否允许对文件进行追加操作?
  3. 作业框架(Spark)是否可以实现追加操作?

答案:

  1. parquet.hadoop.ParquetFileWriter仅支持CREATEOVERWRITE;没有append模式。(不确定但其他实现可能会改变 - parquet设计支持append

  2. HDFS允许使用dfs.support.append属性对文件进行追加操作。

  3. Spark框架不支持向现有parquet文件追加,并且没有计划;请参见此JIRA:https://issues.apache.org/jira/browse/SPARK-18199

在分布式系统中追加到现有文件不是一个好主意,特别是考虑到我们可能同时拥有两个写入者。

更多详细信息在此处:


感谢您提供详细的答案和背景信息,这非常有帮助。 - Keith
这是一份很好的解释。以下几点需要注意:1)如果PARQUET本身支持追加,那么说它是不可变的是不正确的。2)您能帮我理解为什么在分布式系统中追加到现有文件不好吗? - Ritesh

8

有一些解决方法,但需要按照特定方式创建parquet文件以便更容易更新。

最好的做法:

A. 使用行组来创建parquet文件。您需要优化行组中可以放入多少行数据,以使数据压缩和字典编码等功能停止运行。

B. 逐个扫描行组并找出需要更新的行组。为每个修改后的行组生成包含修改数据的新parquet文件。逐个处理一个行组的数据比处理整个文件更节省内存。

C. 通过追加未修改的行组和使用读入一个parquet文件生成的修改后的行组来重建原始parquet文件。

使用行组重新组装parquet文件非常快速。

理论上,如果只去掉页脚(统计信息),添加新的行组并添加更新统计信息的新页脚,就可以轻松地将内容添加到现有的parquet文件中,但没有支持此操作的API /库。


有没有 Git 代码可以理解这种方法? - JustTry
1
我使用这种方法来更新parquet文件。在编写parquet文件时,我创建了第二个parquet文件,它充当主索引,跟踪键入记录所在的parquet文件/行组。 我能够快速提取数据,修改它,然后重新组装parquet文件,使用其原始行组,减去提取的行组,加上修改后的行组。 以下是一些基本信息,可帮助处理行组。https://arrow.apache.org/docs/python/parquet.html#finer-grained-reading-and-writing - David Lee

3
请查看这篇很好的博客,它可以回答您的问题并提供使用Spark(Scala)执行更新的方法:http://aseigneurin.github.io/2017/03/14/incrementally-loaded-parquet-files.html 从博客中复制粘贴如下内容: 当我们需要编辑数据时,在我们的数据结构(Parquet)中,数据是不可变的。
引用: 您可以将分区添加到Parquet文件中,但不能直接编辑数据。 但是,最终我们可以更改数据,我们只需要接受我们不会直接进行更改。我们需要使用模式和UDF的组合重新创建Parquet文件以纠正错误数据。
如果您想在Parquet中逐步追加数据(您没有问这个问题,但对其他读者可能有用) 请参阅这篇写得很好的博客:http://aseigneurin.github.io/2017/03/14/incrementally-loaded-parquet-files.html 免责声明:我没有撰写这些博客,我只是阅读并发现它可能对其他人有用。

我明白这个问题涉及到的内容比仅仅附加分区要复杂一些,正如博客文章所解释的那样。假设有这样的一个场景,某些数据有效直到有更多的数据到来,这在SQL数据库中可以很容易地实现,但在Parquet文件中你不能只更新记录的字段,因此需要有创意的解决方法。 - Richard Gomes
我对第二篇博客的理解是,这个附加数据会创建一个新的文件,并且这个新文件被视为现有parquet文件的一个新分区。问题是:如果我想使用SparQl或AWS Athena在表格上运行SQL(即包含所有分区的parquet文件),我认为它需要关于所有组成文件的元数据。而且文件/分区越多,元数据就越多。这会给SparQL或AWS Athena增加额外的开销。我还可以说,读取多个小文件比顺序读取一个大文件效率更低。对吗? - undefined

2

您需要重新创建文件,这是Hadoop的方法。特别是当文件被压缩时。

另一种方法(在大数据中非常常见)是在另一个Parquet(或ORC)文件上执行更新,然后在查询时进行JOIN / UNION操作。


嗯,在2022年,我强烈建议使用湖屋解决方案,例如deltaLake或Apache Iceberg。它们会为您处理这个问题。


你的意思是版本控制系统吗? - JustTry
1
是的!现在在2022年,我强烈建议使用类似deltaLake或Apache Iceberg的湖屋解决方案。它们会为您处理这些问题。 - Thomas Decaux

网页内容由stack overflow 提供, 点击上面的
可以查看英文原文,
原文链接