使用pyarrow如何向parquet文件追加数据?

54
如何使用 pyarrowparquet 文件追加/更新数据?
import pandas as pd
import pyarrow as pa
import pyarrow.parquet as pq


 table2 = pd.DataFrame({'one': [-1, np.nan, 2.5], 'two': ['foo', 'bar', 'baz'], 'three': [True, False, True]})
 table3 = pd.DataFrame({'six': [-1, np.nan, 2.5], 'nine': ['foo', 'bar', 'baz'], 'ten': [True, False, True]})


pq.write_table(table2, './dataNew/pqTest2.parquet')
#append pqTest2 here?  

我在文档中没有找到有关附加parquet文件的内容。此外,您能否使用pyarrow和多进程一起插入/更新数据。


1
你是故意在两个表中使用完全不同的列名吗? - Dima Fomin
5个回答

49

我遇到了同样的问题,我认为我通过以下方法解决了它:

import pandas as pd
import pyarrow as pa
import pyarrow.parquet as pq


chunksize=10000 # this is the number of lines

pqwriter = None
for i, df in enumerate(pd.read_csv('sample.csv', chunksize=chunksize)):
    table = pa.Table.from_pandas(df)
    # for the first chunk of records
    if i == 0:
        # create a parquet write object giving it an output file
        pqwriter = pq.ParquetWriter('sample.parquet', table.schema)            
    pqwriter.write_table(table)

# close the parquet writer
if pqwriter:
    pqwriter.close()

2
当然,这取决于数据,但根据我的经验,chunksize=10000 太大了。在大多数情况下,块大小约为一百的值对我来说更快。 - Yury Kirienko
2
由于在两种情况下都要写入表格,因此if后面的else是不必要的。 - hodisr
1
谢谢!截至目前,逐步编写Parquet的API文档确实不是很完善。 - Michele Piccolini
1
@YuryKirienko 我使用 chunksize=1e5 取得了最佳性能。对于其他人的最佳建议是:使用不同的值进行基准测试,看看哪个最适合你。 - Michele Piccolini
2
这个解决方案只在写入者仍然打开的情况下有效...更好的方法是将文件放在一个目录中。pandas/pyarrow会在读取目录时将两个文件附加到数据帧中。 - natbusa
显示剩余5条评论

19
在你的情况下,列名不一致。我将三个示例数据帧的列名统一,并且下面的代码对我起作用了。
# -*- coding: utf-8 -*-
import numpy as np
import pandas as pd
import pyarrow as pa
import pyarrow.parquet as pq


def append_to_parquet_table(dataframe, filepath=None, writer=None):
    """Method writes/append dataframes in parquet format.

    This method is used to write pandas DataFrame as pyarrow Table in parquet format. If the methods is invoked
    with writer, it appends dataframe to the already written pyarrow table.

    :param dataframe: pd.DataFrame to be written in parquet format.
    :param filepath: target file location for parquet file.
    :param writer: ParquetWriter object to write pyarrow tables in parquet format.
    :return: ParquetWriter object. This can be passed in the subsequenct method calls to append DataFrame
        in the pyarrow Table
    """
    table = pa.Table.from_pandas(dataframe)
    if writer is None:
        writer = pq.ParquetWriter(filepath, table.schema)
    writer.write_table(table=table)
    return writer


if __name__ == '__main__':

    table1 = pd.DataFrame({'one': [-1, np.nan, 2.5], 'two': ['foo', 'bar', 'baz'], 'three': [True, False, True]})
    table2 = pd.DataFrame({'one': [-1, np.nan, 2.5], 'two': ['foo', 'bar', 'baz'], 'three': [True, False, True]})
    table3 = pd.DataFrame({'one': [-1, np.nan, 2.5], 'two': ['foo', 'bar', 'baz'], 'three': [True, False, True]})
    writer = None
    filepath = '/tmp/verify_pyarrow_append.parquet'
    table_list = [table1, table2, table3]

    for table in table_list:
        writer = append_to_parquet_table(table, filepath, writer)

    if writer:
        writer.close()

    df = pd.read_parquet(filepath)
    print(df)

输出:

   one  three  two
0 -1.0   True  foo
1  NaN  False  bar
2  2.5   True  baz
0 -1.0   True  foo
1  NaN  False  bar
2  2.5   True  baz
0 -1.0   True  foo
1  NaN  False  bar
2  2.5   True  baz

很遗憾,这无法附加到现有的.parquet文件中(请参见我的答案)。原因:一旦调用.close(),文件就无法附加,而在调用.close()之前,.parquet文件无效(由于缺少二进制页脚,将抛出异常导致文件损坏)。@Contango的答案解决了这个问题。 - Contango

18

一般而言,Parquet数据集由多个文件组成,因此您可以通过将另一个文件写入数据所属的同一目录来进行追加。方便起见,能够轻松地连接多个文件将非常有用。我在 https://issues.apache.org/jira/browse/PARQUET-1154 中提出了这个想法,以使其在C++(因此也适用于Python)中易于实现。


1
请包括更新数据。也许在Arrow中有一些东西可以起作用。 - Merlin
1
请到Arrow和Parquet的邮件列表中提出您的问题。Stack Overflow并不是获取支持的最佳途径。 - Wes McKinney
2
Parquet-Tools 命令中是否没有 parquet-merge 选项?- 至少从命令行中是这样吗?(免责声明:我还没有尝试过) - natbusa
1
Parquet文件有时在Windows上显示为单个文件。如何在Windows上将其视为文件夹? - xiaodai

10

演示如何将Pandas数据框追加到现有的.parquet文件中。

注意:其他答案无法将内容追加到现有的.parquet文件中,但此方法可以;请参见结尾处的讨论。

在Windows和Linux上测试了Python v3.9。

使用pip安装PyArrow:

pip install pyarrow==6.0.1

或者Anaconda/ Miniconda

conda install -c conda-forge pyarrow=6.0.1 -y

演示代码:
# Q. Demo?
# A. Demo of appending to an existing .parquet file by memory mapping the original file, appending the new dataframe, then writing the new file out.

import os
import numpy as np
import pandas as pd
import pyarrow as pa  
import pyarrow.parquet as pq  

filepath = "parquet_append.parquet"

方法1/2

简单方法:使用pandas将原始的.parquet文件读入,追加后,再将整个文件写回。

# Create parquet file.
df = pd.DataFrame({"x": [1.,2.,np.nan], "y": ["a","b","c"]})  # Create dataframe ...
df.to_parquet(filepath)  # ... write to file.

# Append to original parquet file.
df = pd.read_parquet(filepath)  # Read original ...
df2 = pd.DataFrame({"x": [3.,4.,np.nan], "y": ["d","e","f"]})  # ... create new dataframe to append ...
df3 = pd.concat([df, df2])  # ... concatenate together ...
df3.to_parquet(filepath)  # ... overwrite original file.

# Demo that new data frame has been appended to old.
df_copy = pd.read_parquet(filepath)
print(df_copy)
#      x  y
# 0  1.0  a
# 1  2.0  b
# 2  NaN  c
# 0  3.0  d
# 1  4.0  e
# 2  NaN  f

第二种方法:更为复杂但速度更快

使用原生PyArrow调用,内存映射原始文件,将新的数据框追加到其中,然后将新文件写出。

# Write initial file using PyArrow.
df = pd.DataFrame({"x": [1.,2.,np.nan], "y": ["a","b","c"]})  # Create dataframe ...
table = pa.Table.from_pandas(df)
pq.write_table(table, where=filepath)

def parquet_append(filepath:Path or str, df: pd.DataFrame) -> None:
    """
    Append to dataframe to existing .parquet file. Reads original .parquet file in, appends new dataframe, writes new .parquet file out.
    :param filepath: Filepath for parquet file.
    :param df: Pandas dataframe to append. Must be same schema as original.
    """
    table_original_file = pq.read_table(source=filepath,  pre_buffer=False, use_threads=True, memory_map=True)  # Use memory map for speed.
    table_to_append = pa.Table.from_pandas(df)
    table_to_append = table_to_append.cast(table_original_file.schema)  # Attempt to cast new schema to existing, e.g. datetime64[ns] to datetime64[us] (may throw otherwise).
    handle = pq.ParquetWriter(filepath, table_original_file.schema)  # Overwrite old file with empty. WARNING: PRODUCTION LEVEL CODE SHOULD BE MORE ATOMIC: WRITE TO A TEMPORARY FILE, DELETE THE OLD, RENAME. THEN FAILURES WILL NOT LOSE DATA.
    handle.write_table(table_original_file)
    handle.write_table(table_to_append)
    handle.close()  # Writes binary footer. Until this occurs, .parquet file is not usable.

# Append to original parquet file.
df = pd.DataFrame({"x": [3.,4.,np.nan], "y": ["d","e","f"]})  # ... create new dataframe to append ...
parquet_append(filepath, df)

# Demo that new data frame has been appended to old.
df_copy = pd.read_parquet(filepath)
print(df_copy)
#      x  y
# 0  1.0  a
# 1  2.0  b
# 2  NaN  c
# 0  3.0  d
# 1  4.0  e
# 2  NaN  f

讨论

@Ibraheem Ibraheem 和 @yardstick17 的答案不能用于追加到现有的 .parquet 文件中:

  • 限制 1:一旦调用了 .close(),文件就无法再追加。一旦页脚被写入,一切都已经定型;
  • 限制 2:在调用 .close() 之前,任何其他程序都无法读取 .parquet 文件(因为二进制页脚缺失会抛出异常)。

综合这些限制,意味着它们不能用于追加到现有的 .parquet 文件中,它们只能用于以块的形式编写 .parquet 文件。上述技术消除了这些限制,但代价是要将整个文件重写以追加到结尾。经过广泛的研究,我相信使用现有的 PyArrow 库(截至 v6.0.1),不可能追加到现有的 .parquet 文件中。

可以将此方法修改为将文件夹中的多个 .parquet 文件合并成单个 .parquet 文件。

可以执行有效的 upsert 操作:pq.read_table() 具有对列和行进行筛选的过滤器,因此如果在加载时从原始表格中筛选出了行,则新表格中的行会有效地替换旧表格中的行。这对于时间序列数据更有用。


令人惊讶的是,Fastparquet 可以让我们将行组附加到已经存在的 Parquet 文件中。 - ns15

5
被接受的答案只有在您打开了pyarrow parquet writer时才有效。一旦writer关闭,我们就无法向parquet文件追加行组。pyarrow没有任何实现来追加到已经存在的parquet文件。
使用fastparquet可以向已经存在的parquet文件追加行组。这里是一个解释如何做到这一点的SO答案,并附有示例。
从fastparquet 文档中:
append:bool(False)或'overwrite'。如果为False,则从头构建数据集;如果为True,则将新的行组添加到现有数据集中。在后一种情况下,数据集必须存在,并且架构必须与输入数据匹配。
from fastparquet import write
write('output.parquet', df, append=True)

更新: 希望在pyarrow中也能实现这个功能 - JIRA

网页内容由stack overflow 提供, 点击上面的
可以查看英文原文,
原文链接