使用Python读取文件夹中的多个Parquet文件并将其写入单个CSV文件

31

我是Python的新手,我有一个场景,其中有多个带有按顺序排列的文件名的parquet文件。例如:par_file1、par_file2、par_file3等,直到文件夹中达到100个文件。

我需要按顺序读取这些parquet文件,从file1开始,并将其写入单个csv文件中。在写入file1的内容后,应该将file2的内容附加到相同的csv文件中(不包含标题)。请注意,所有文件具有相同的列名称,只是数据分为多个文件。

我学会使用以下代码将单个parquet转换为csv文件:

import pandas as pd    
df = pd.read_parquet('par_file.parquet')    
df.to_csv('csv_file.csv')

但我无法将其扩展到循环处理多个parquet文件并附加到单个csv中。 Pandas中是否有一种方法可以做到这一点?或者采用其他方式完成此操作将非常有帮助。谢谢。


你是否在与 Parquet 文件相同的目录下运行 Python 代码? - It_is_Chris
我正在考虑将Parquet文件复制到本地文件夹并从本地机器上运行Python代码。我完全是新手,不确定在哪里运行Python代码。如果可以在代码中给出HDFS文件夹位置并将内容复制到本地的CSV文件中,那也完全可以。如果我没有正确理解您的查询,我很抱歉。 - Pri31
6个回答

48

我查找pandas是否原生支持读取分区parquet数据集时遇到了这个问题。我必须说,当前的答案过于冗长(使其难以解析)。而且我想象中,根据大小不断打开/关闭文件句柄然后扫描它们的末尾并不是特别高效。

更好的选择是将所有的parquet文件读入一个单独的DataFrame中,并一次性写入:

from pathlib import Path
import pandas as pd

data_dir = Path('dir/to/parquet/files')
full_df = pd.concat(
    pd.read_parquet(parquet_file)
    for parquet_file in data_dir.glob('*.parquet')
)
full_df.to_csv('csv_file.csv')

或者,如果你真的只想将内容追加到文件中:

data_dir = Path('dir/to/parquet/files')
for i, parquet_path in enumerate(data_dir.glob('*.parquet')):
    df = pd.read_parquet(parquet_path)
    write_header = i == 0 # write header only on the 0th file
    write_mode = 'w' if i == 0 else 'a' # 'write' mode for 0th file, 'append' otherwise
    df.to_csv('csv_file.csv', mode=write_mode, header=write_header)

另一种将每个文件附加到目标CSV文件的最终选择是,在开头以 "a+"模式打开目标文件,对于每次写入/附加都将文件句柄扫描到文件末尾(我相信这是有效的,但尚未实际测试):

data_dir = Path('dir/to/parquet/files')
with open('csv_file.csv', "a+") as csv_handle:
    for i, parquet_path in enumerate(data_dir.glob('*.parquet')):
        df = pd.read_parquet(parquet_path)
        write_header = i == 0 # write header only on the 0th file
        df.to_csv(csv_handle, header=write_header)

3
这种读取并连接的方法会受到内存容量(8G、16GB)的限制,而不管数据大小。而采用打开再追加的方式则没有这个限制。 - B.Mr.W.
@B.Mr.W. 非常感谢! - M.Ionut
@B.Mr.W. 开放+追加的方法也会受到内存的限制,因为pd.read_parquet一次性将parquet文件读入RAM。因此,如果parquet文件非常大,会导致内存错误,因为无法将数据框存储在RAM中。为了保持稳定,需要将parquet文件分成部分/块进行处理。 - undefined

32

我有一个类似的需求,我看到当前 Pandas 版本支持将目录路径作为参数传递给 read_csv 函数。因此,您可以像这样读取多个 parquet 文件:

import pandas as pd    
df = pd.read_parquet('path/to/the/parquet/files/directory')    

它将所有内容连接成一个单独的数据框,因此您可以立即将其转换为csv:

df.to_csv('csv_file.csv')

请确保您按照文档具备以下依赖项:

  • pyarrow
  • fastparquet

对我来说不起作用。 - crobar

12
这帮助我将所有parquet文件加载到一个数据框中。
import glob
 files = glob.glob("*.snappy.parquet")
 data = [pd.read_parquet(f,engine='fastparquet') for f in files]
 merged_data = pd.concat(data,ignore_index=True)

1
你不是受内存限制的吗? - M.Ionut

2
如果您要将文件复制到本地计算机并运行代码,可以按照以下方式操作。下面的代码假定您在与parquet文件相同的目录中运行代码。它还假定文件命名如上所述:“order.例如:par_file1、par_file2、par_file3等,直到一个文件夹中有100个文件。”如果您需要搜索文件,则需要使用glob获取文件名,并明确提供要保存csv文件的路径:open(r'this\is\your\path\to\csv_file.csv','a')希望这能帮到您。
import pandas as pd

# Create an empty csv file and write the first parquet file with headers
with open('csv_file.csv','w') as csv_file:
    print('Reading par_file1.parquet')
    df = pd.read_parquet('par_file1.parquet')
    df.to_csv(csv_file, index=False)
    print('par_file1.parquet appended to csv_file.csv\n')
    csv_file.close()

# create your file names and append to an empty list to look for in the current directory
files = []
for i in range(2,101):
    files.append(f'par_file{i}.parquet')

# open files and append to csv_file.csv
for f in files:
    print(f'Reading {f}')
    df = pd.read_parquet(f)
    with open('csv_file.csv','a') as file:
        df.to_csv(file, header=False, index=False)
        print(f'{f} appended to csv_file.csv\n')

如果您想要的话,可以删除打印语句。

在使用 pandas 0.23.3python 3.6 中进行了测试。


非常感谢,这正是我想要的。我通过替换parquet和csv文件位置的绝对路径在idle中运行了此代码。我还有一个问题。如果我不将文件复制到本地并希望直接从hdfs文件夹位置读取parquet文件,然后将内容附加到本地的csv文件中,那么是否有一种方法可以在此代码中给出hdfs路径? - Pri31
@Pri31 这份文档应该会有所帮助:https://crs4.github.io/pydoop/api_docs/hdfs_api.html - It_is_Chris

0
你可以使用Dask读取多个Parquet文件并将它们写入一个单独的CSV文件。
Dask接受星号(*)作为通配符/ glob字符来匹配相关的文件名。
在写入CSV文件时,请确保将single_file设置为True,将index设置为False。
import pandas as pd
import numpy as np

# create some dummy dataframes using np.random and write to separate parquet files
rng = np.random.default_rng()

for i in range(3):
    df = pd.DataFrame(rng.integers(0, 100, size=(10, 4)), columns=list('ABCD'))
    df.to_parquet(f"dummy_df_{i}.parquet")

# load multiple parquet files with Dask
import dask.dataframe as dd
ddf = dd.read_parquet('dummy_df_*.parquet', index=False)

# write to single csv
ddf.to_csv("dummy_df_all.csv", 
           single_file=True, 
           index=False
)

# test to verify
df_test = pd.read_csv("dummy_df_all.csv")

使用Dask可以避免担心结果文件大小的问题(Dask是一个分布式计算框架,可以处理任何你投入其中的东西,而pandas可能会在结果DataFrame太大时抛出MemoryError),并且你可以轻松地从云数据存储(如Amazon S3)中读取和写入。

0

对于那些尝试读取远程文件的人来说,有一个小变化可以帮助更快地读取它(对于我来说,直接使用read_parquet来读取远程文件要慢得多):

import io
merged = []
# remote_reader = ... <- init some remote reader, for example AzureDLFileSystem()
for f in files:
    with remote_reader.open(f, 'rb') as f_reader:
        merged.append(remote_reader.read())
merged = pd.concat((pd.read_parquet(io.BytesIO(file_bytes)) for file_bytes in merged))

虽然会增加一些临时内存开销。


网页内容由stack overflow 提供, 点击上面的
可以查看英文原文,
原文链接