使用pandas将数据框以追加方式写入parquet格式。

28
我试图在pandas中以append模式将dataframe写入到parquet文件格式(在最近的pandas版本0.21.0中引入)。然而,它并没有像预期那样将数据附加到现有文件中,而是用新数据覆盖了原有文件。我错过了什么吗?
写入语法如下:
df.to_parquet(path, mode='append')

阅读语法是

pd.read_parquet(path)

尝试以追加模式打开文件。 - Shihe Zhang
1
这个不起作用(与之前的情况没有区别) - Siraj S.
在文档中,to_parquet() API没有append模式。如果您想要追加到一个文件中,可以使用文件的append模式。这就是我之前试图表达的内容。 - Shihe Zhang
使用PyArrow,如何向Parquet文件追加数据? - Andrey
如果您想将内容追加到同一个文件中,则可以忽略我的评论,但有时将新的Parquet文件写入同一目录并使用另一个名称可能会很有用。因此,下次您可以读取目录而不是特定文件,并且您将获得该目录中每个Parquet文件中的数据。 - Ariel Catala Valencia
7个回答

25
看起来可以使用fastparquet将行组附加到已存在的parquet文件中。这是一个非常独特的功能,因为大多数库都没有这个实现。
以下是来自pandas doc的内容:
DataFrame.to_parquet(path, engine='auto', compression='snappy', index=None, partition_cols=None, **kwargs)

我们需要同时传递引擎和**kwargs。
引擎{‘auto’, ‘pyarrow’, ‘fastparquet’}
**kwargs - 传递给parquet库的额外参数。
**kwargs - 在这里我们需要传递的是:append=True (来自fastparquet)
import pandas as pd
from pathlib import Path

df = pd.DataFrame({'col1': [1, 2,], 'col2': [3, 4]})
file_path = Path("D:\\dev\\output.parquet")

if file_path.exists():
  df.to_parquet(file_path, engine='fastparquet', append=True)
else:
  df.to_parquet(file_path, engine='fastparquet')

如果将append设置为True并且文件不存在,则会显示以下错误。
AttributeError: 'ParquetFile' object has no attribute 'fmd'

运行以上脚本3次后,我在parquet文件中得到以下数据。 enter image description here 如果我检查元数据,我可以看到这导致了3个行组。

enter image description here


注意: 如果你写太多小的行组,追加可能会效率低下。通常建议的行组大小接近于100,000或1,000,000行。这对于非常小的行组有一些好处。压缩效果会更好,因为压缩仅在行组内进行。存储统计信息的开销也会更少,因为每个行组都存储自己的统计信息。

1
我尝试了使用pyarrow,但是失败了,所以似乎只能像作者建议的那样,使用fastparquet才能正常工作。 - DarkHark

7

如果要追加内容,可以这样做:

import pandas as pd 
import pyarrow.parquet as pq
import pyarrow as pa

dataframe = pd.read_csv('content.csv')
output = "/Users/myTable.parquet"

# Create a parquet table from your dataframe
table = pa.Table.from_pandas(dataframe)

# Write direct to your parquet file
pq.write_to_dataset(table , root_path=output)
这将自动附加到您的表中。

2
它将创建一个带有几个parquet文件的目录,作为pyarrow数据集。 - banderlog013

3

我使用了awswrangler库。它非常好用。

以下是参考文档

https://aws-data-wrangler.readthedocs.io/en/latest/stubs/awswrangler.s3.to_parquet.html

我已经从Kinesis流中读取了数据,并使用kinesis-python库消费消息并将其写入S3。由于本文涉及无法将数据附加到S3的问题,因此我没有包含JSON的处理逻辑。在AWS Sagemaker Jupyter中执行。

以下是我使用的示例代码:

!pip install awswrangler
import awswrangler as wr
import pandas as pd
evet_data=pd.DataFrame({'a': [a], 'b':[b],'c':[c],'d':[d],'e': [e],'f':[f],'g': [g]},columns=['a','b','c','d','e','f','g'])
#print(evet_data)
s3_path="s3://<your bucker>/table/temp/<your folder name>/e="+e+"/f="+str(f)
try:
    wr.s3.to_parquet(
    df=evet_data,
    path=s3_path,
    dataset=True,
    partition_cols=['e','f'],
    mode="append",
    database="wat_q4_stg",
    table="raw_data_v3",
    catalog_versioning=True  # Optional
    )
    print("write successful")       
except Exception as e:
    print(str(e))

有任何需要帮助的澄清,请随时联系。在我阅读的几篇文章中,建议先读取数据再进行覆盖。但是,随着数据量的增大,这种方法会使处理速度变慢,效率低下。


嘿,谢谢你提供这个 - 它似乎可以创建流畅的.parquet文件。有没有办法创建单一的parquet文件,或者至少是非snappy文件? - ethereumbrella
1
算了,它有一个压缩选项为None。 - ethereumbrella

1

1

pandas.to_parquet() 中没有追加模式,您可以读取现有文件,修改它,并覆盖性地写回它。


0
如果您正在考虑使用分区:
根据Pyarrow doc(在使用分区时调用的函数),您可能希望将partition_cols与唯一的basename_template名称结合使用。例如,类似以下内容:
df.to_parquet(root_path, partition_cols=["..."], basename_template="{i}")

您可以在没有重叠现有数据的情况下省略basename_template,但如果存在重叠,它会创建重复的.parquet文件。
如果您的分区列包含时间戳,这非常方便。这样,您实际上可以拥有一个"滚动"的DataFrame,并且不会写入重复的文件,只会创建与新时间对应的新文件。

-1

Pandas中的to_parquet()函数既可以处理单个文件,也可以处理包含多个文件的目录。如果文件已经存在,则Pandas会在不提示的情况下覆盖该文件。要将数据附加到parquet对象中,只需将新文件添加到同一个parquet目录中即可。

os.makedirs(path, exist_ok=True)

# write append (replace the naming logic with what works for you)
filename = f'{datetime.datetime.utcnow().timestamp()}.parquet'
df.to_parquet(os.path.join(path, filename))

# read
pd.read_parquet(path)

我认为这个会随着时间线性扩展,即比@Naveen在https://dev59.com/t1YN5IYBdhLWcg3wm5EL#64814917中建议的“append”模式更好 - 我是对的吗?而且to_parquet()支持S3,对吗? - jtlz2

网页内容由stack overflow 提供, 点击上面的
可以查看英文原文,
原文链接