如何使用Python中的pyarrow从S3读取分区parquet文件

64

我正在寻找使用Python从S3中的多个分区目录读取数据的方法。

data_folder/serial_number=1/cur_date=20-12-2012/abcdsd0324324.snappy.parquet
data_folder/serial_number=2/cur_date=27-12-2012/asdsdfsd0324324.snappy.parquet

pyarrow的ParquetDataset模块具有读取分区的功能。因此,我尝试了以下代码:

>>> import pandas as pd
>>> import pyarrow.parquet as pq
>>> import s3fs
>>> a = "s3://my_bucker/path/to/data_folder/"
>>> dataset = pq.ParquetDataset(a)

它抛出了以下错误:

Traceback (most recent call last):
  File "<stdin>", line 1, in <module>
  File "/home/my_username/anaconda3/lib/python3.6/site-packages/pyarrow/parquet.py", line 502, in __init__
    self.metadata_path) = _make_manifest(path_or_paths, self.fs)
  File "/home/my_username/anaconda3/lib/python3.6/site-packages/pyarrow/parquet.py", line 601, in _make_manifest
    .format(path))
OSError: Passed non-file path: s3://my_bucker/path/to/data_folder/

根据pyarrow的文档,我尝试使用s3fs作为文件系统,即:

>>> dataset = pq.ParquetDataset(a,filesystem=s3fs)

会抛出以下错误:

Traceback (most recent call last):
  File "<stdin>", line 1, in <module>
  File "/home/my_username/anaconda3/lib/python3.6/site-packages/pyarrow/parquet.py", line 502, in __init__
    self.metadata_path) = _make_manifest(path_or_paths, self.fs)
  File "/home/my_username/anaconda3/lib/python3.6/site-packages/pyarrow/parquet.py", line 583, in _make_manifest
    if is_string(path_or_paths) and fs.isdir(path_or_paths):
AttributeError: module 's3fs' has no attribute 'isdir'

由于我只能使用ECS集群,因此spark / pyspark不是可选项

有没有办法在python中轻松读取来自这些分区目录的parquet文件?我认为像在这个链接中建议的那样列出所有目录然后进行读取并不是一个好的做法。我需要将读取的数据转换为pandas dataframe以进行进一步处理,因此更喜欢与fastparquet或pyarrow相关的选项。我也可以考虑Python中的其他选项。


1
让我们在 https://issues.apache.org/jira/browse/ARROW-1213 和 https://issues.apache.org/jira/browse/ARROW-1119 中讨论。我们必须添加一些代码,以使 pyarrow 能够识别 s3fs 文件系统,并添加一个 shim / 兼容类来符合 S3FS 略有不同的文件系统 API 以适应 pyarrow 的要求。 - Wes McKinney
5个回答

61

我成功地使用最新版本的fastparquet和s3fs使其工作。以下是相应的代码:

import s3fs
import fastparquet as fp
s3 = s3fs.S3FileSystem()
fs = s3fs.core.S3FileSystem()

#mybucket/data_folder/serial_number=1/cur_date=20-12-2012/abcdsd0324324.snappy.parquet 
s3_path = "mybucket/data_folder/*/*/*.parquet"
all_paths_from_s3 = fs.glob(path=s3_path)

myopen = s3.open
#use s3fs as the filesystem
fp_obj = fp.ParquetFile(all_paths_from_s3,open_with=myopen)
#convert to pandas dataframe
df = fp_obj.to_pandas()

感谢马丁通过我们的对话将我引向正确的方向。

注意:相对于基准测试,这种方法比使用pyarrow更慢。一旦pyarrow支持s3fs,我会更新我的答案,相关信息请参见ARROW-1213

我对使用pyarrow和将文件列表作为fastparquet的全局变量进行单独迭代进行了快速基准测试。使用s3fs的fastparquet比pyarrow更快,而加上我的hackish代码后pyarrow则稍微快一些。但是我认为一旦实现了pyarrow+s3fs,则会更快。

以下是代码和基准测试:

>>> def test_pq():
...     for current_file in list_parquet_files:
...         f = fs.open(current_file)
...         df = pq.read_table(f).to_pandas()
...         # following code is to extract the serial_number & cur_date values so that we can add them to the dataframe
...         #probably not the best way to split :)
...         elements_list=current_file.split('/')
...         for item in elements_list:
...             if item.find(date_partition) != -1:
...                 current_date = item.split('=')[1]
...             elif item.find(dma_partition) != -1:
...                 current_dma = item.split('=')[1]
...         df['serial_number'] = current_dma
...         df['cur_date'] = current_date
...         list_.append(df)
...     frame = pd.concat(list_)
...
>>> timeit.timeit('test_pq()',number =10,globals=globals())
12.078817503992468

>>> def test_fp():
...     fp_obj = fp.ParquetFile(all_paths_from_s3,open_with=myopen)
...     df = fp_obj.to_pandas()

>>> timeit.timeit('test_fp()',number =10,globals=globals())
2.961556333000317

2019年更新:

经过所有PRs、问题的处理,例如Arrow-2038Fast Parquet - PR#182已得到解决。

使用Pyarrow读取parquet文件

# pip install pyarrow
# pip install s3fs

>>> import s3fs
>>> import pyarrow.parquet as pq
>>> fs = s3fs.S3FileSystem()

>>> bucket = 'your-bucket-name'
>>> path = 'directory_name' #if its a directory omit the traling /
>>> bucket_uri = f's3://{bucket}/{path}'
's3://your-bucket-name/directory_name'

>>> dataset = pq.ParquetDataset(bucket_uri, filesystem=fs)
>>> table = dataset.read()
>>> df = table.to_pandas() 

使用 Fast parquet 读取 parquet 文件

# pip install s3fs
# pip install fastparquet

>>> import s3fs
>>> import fastparquet as fp

>>> bucket = 'your-bucket-name'
>>> path = 'directory_name'
>>> root_dir_path = f'{bucket}/{path}'
# the first two wild card represents the 1st,2nd column partitions columns of your data & so forth
>>> s3_path = f"{root_dir_path}/*/*/*.parquet"
>>> all_paths_from_s3 = fs.glob(path=s3_path)

>>> fp_obj = fp.ParquetFile(all_paths_from_s3,open_with=myopen, root=root_dir_path)
>>> df = fp_obj.to_pandas()

快速基准测试

这可能不是最好的基准测试方法。请阅读博客文章以获取全面的基准测试结果。

#pyarrow
>>> import timeit
>>> def test_pq():
...     dataset = pq.ParquetDataset(bucket_uri, filesystem=fs)
...     table = dataset.read()
...     df = table.to_pandas()
...
>>> timeit.timeit('test_pq()',number =10,globals=globals())
1.2677053569998407

#fastparquet
>>> def test_fp():
...     fp_obj = fp.ParquetFile(all_paths_from_s3,open_with=myopen, root=root_dir_path)
...     df = fp_obj.to_pandas()

>>> timeit.timeit('test_fp()',number =10,globals=globals())
2.931876824000028

关于Pyarrow的速度的更多阅读材料。

参考资料:


1
感谢您进行了全面的分析;既然ARROW-1213已经解决了,您有任何新的基准测试要分享吗?谢谢。 - Todor Minakov
我认为在ARROW-1213问题得到解决后,还有一些其他的bug需要解决。请参见https://issues.apache.org/jira/browse/ARROW-2038。同时,如果需要使用pyarrow,我们可以使用类似于https://github.com/apache/arrow/pull/916#issuecomment-337619158中提到的内容。 - stormfield
1
@TodorMinakov 我已经更新了答案和基准测试。 - stormfield
s3fs和pyarrow版本的组合对于使其正常工作至关重要。例如,一个可行的组合是s3fs==0.3.5和pyarrow>=0.14.0。 - Vincent Claes
谢谢!我正在尝试迭代遍历parquet文件,并创建一个Python生成器,其中每个元素包含batch_size行。在使用pq.ParquetDataset()创建的pyarrow表中,是否有类似iter_batches()的等效方法? - haneulkim
显示剩余3条评论

35

对于Python 3.6+,AWS有一个名为aws-data-wrangler的库,可帮助Pandas/S3/Parquet之间的集成

要安装,请执行以下操作;

pip install awswrangler

使用 awswrangler 1.x.x 及以上版本从 S3 中读取分区 Parquet 文件的方法如下:

import awswrangler as wr
df = wr.s3.read_parquet(path="s3://my_bucket/path/to/data_folder/", dataset=True)

通过设置 dataset=True,awswrangler 希望读取分区 parquet 文件。它将从指定的 path 下面的所有分区中读取各个 parquet 文件。


@Vincent_Claes 谢谢您的回复。您如何指定只加载特定的分区?您如何应用该过滤器?库会为您完成吗? - rjurney
@rjurney awswrangler支持对分区进行过滤。您可以在此处找到一些示例:https://github.com/awslabs/aws-data-wrangler/blob/master/tutorials/023%20-%20Flexible%20Partitions%20Filter.ipynb - Vincent Claes
谢谢!这对我来说非常有效! - Mojgan Mazouchi
在哪里设置密钥,将数据写入私有的S3存储桶需要密钥。 - 2015evanotes
1
@2015evanotes 你是指KMS密钥吗?如果是的话,这个答案可以帮到你 https://dev59.com/Zrfna4cB1Zd3GeqPysQd#59713720 - Vincent Claes

10

对于那些仅想读取分区parquet文件的 部分 的人,pyarrow接受一个键列表以及仅读取分区部分路径,以便读取所有分区中的所有部分。这种方法对于将parquet数据集按年份或国家等有意义的方式进行分区的组织特别有用,允许用户指定所需文件的哪些部分。长期来看,这将降低成本,因为AWS按字节计费读取数据集。

# Read in user specified partitions of a partitioned parquet file 

import s3fs
import pyarrow.parquet as pq
s3 = s3fs.S3FileSystem()

keys = ['keyname/blah_blah/part-00000-cc2c2113-3985-46ac-9b50-987e9463390e-c000.snappy.parquet'\
         ,'keyname/blah_blah/part-00001-cc2c2113-3985-46ac-9b50-987e9463390e-c000.snappy.parquet'\
         ,'keyname/blah_blah/part-00002-cc2c2113-3985-46ac-9b50-987e9463390e-c000.snappy.parquet'\
         ,'keyname/blah_blah/part-00003-cc2c2113-3985-46ac-9b50-987e9463390e-c000.snappy.parquet']

bucket = 'bucket_yada_yada_yada'

# Add s3 prefix and bucket name to all keys in list
parq_list=[]
for key in keys:
    parq_list.append('s3://'+bucket+'/'+key)

# Create your dataframe
df = pq.ParquetDataset(parq_list, filesystem=s3).read_pandas(columns=['Var1','Var2','Var3']).to_pandas()

1
这是上面所有例子中唯一有效的一个。 - Tampa
你将如何在更高的层次上指定分区?在你的示例中使用keyname或blah_blah。 - CyborgDroid

5

2017年,该拉取请求解决了这个问题。

对于那些希望使用pyarrow仅从S3读取parquet的人,这里是一个示例:

import s3fs
import pyarrow.parquet as pq

fs = s3fs.S3FileSystem()
bucket = "your-bucket"
path = "your-path"

# Python 3.6 or later
p_dataset = pq.ParquetDataset(
    f"s3://{bucket}/{path}",
    filesystem=fs
)
df = p_dataset.read().to_pandas()

# Pre-python 3.6
p_dataset = pq.ParquetDataset(
    "s3://{0}/{1}".format(bucket, path),
    filesystem=fs
)
df = p_dataset.read().to_pandas()

但我认为还有一些问题需要解决。请参见: https://issues.apache.org/jira/browse/ARROW-2038 - stormfield
我认为这并不禁止任何人使用我上面编写的代码来完成提问者所要求的操作。那个讨论与使用上述方法从S3读取parquet文件有什么直接关系呢? - Eugene Brown
我并没有说你的代码不可用。我的意思是,根据 https://github.com/apache/arrow/pull/916#issuecomment-360541307 ,还有一些问题需要解决。从我的理解来看,可能是遗漏了某个特殊情况。因此,在 ARROW-2038 得到解决之前,最好使用 fastparquet 而不是 Arrow。 - stormfield
@efbbrown,你尝试过哪个s3fs和pyarrow版本来解决这个问题? - Vikram Ranabhatt

3

PyArrow 7.0.0对新模块pyarrow.dataset进行了一些改进,旨在将数据集概念从以前的Parquet特定模块pyarrow.parquet.ParquetDataset中抽象出来。

假设您希望从第一个文件中推断数据集架构,则文档中用于读取分区数据集的示例应该可以正常工作。

以下是一个更完整的示例,假设您想使用来自S3的数据:

import pyarrow.dataset as ds
from pyarrow import fs

s3 = fs.S3FileSystem()

dataset = ds.dataset(
    "my-bucket-name/my-path-to-dataset-partitions",
    format="parquet",
    filesystem=s3,
    partitioning="hive"
)

# Assuming your data is partitioned like year=2022/month=4/day=29
# this will only have to read the files for that day

expression = ((ds.field("year") == 2022) & (ds.field("month") == 4) & (ds.field("day") == 29))

pyarrow_table_2022_04_29 = dataset.to_table(filter=expression)

如果您自己定义数据集模式,请注意一个警告。上面的推断使用分区参数会自动将分区添加到数据集模式中

如果您想要手动定义数据集模式并使分区正常工作,必须确保将分区添加到模式中:

import pyarrow as pa

my_manual_schema = pa.schema([])  # Some pyarrow.Schema instance for your dataset

# Be sure to add the partitions even though they're not in the dataset files
my_manual_schema.append(pa.field("year", pa.int16()))
my_manual_schema.append(pa.field("month", pa.int8()))
my_manual_schema.append(pa.field("day", pa.int8()))

dataset = ds.dataset(
    "my-bucket-name/my-path-to-dataset-partitions",
    format="parquet",
    filesystem=s3,
    schema=my_manual_schema,
    partitioning="hive"
)

嗨Nadir,非常好的回答,我尝试使用pd.read_parquet并很失望地发现它由于这个bug无法按预期工作。顺便说一句,我是Brent,曾经与您共事 - 很高兴我偶然发现了您的答案! - lunguini

网页内容由stack overflow 提供, 点击上面的
可以查看英文原文,
原文链接