如何使用pyarrow从S3读取一组parquet文件并将其转换为pandas dataframe?

80

我有一种hacky的方法可以使用boto3(1.4.4)、pyarrow(0.4.1)和pandas(0.20.3)来实现。

首先,我可以像这样本地读取单个parquet文件:

import pyarrow.parquet as pq

path = 'parquet/part-r-00000-1e638be4-e31f-498a-a359-47d017a0059c.gz.parquet'
table = pq.read_table(path)
df = table.to_pandas()

我也可以像这样本地读取一个parquet文件目录:

import pyarrow.parquet as pq

dataset = pq.ParquetDataset('parquet/')
table = dataset.read()
df = table.to_pandas()

两者都非常好用。现在我想通过远程方式使用存储在S3存储桶中的文件实现相同的效果。我希望像这样的东西能够起作用:

dataset = pq.ParquetDataset('s3n://dsn/to/my/bucket')

但是它却不行:

OSError: 传递的不是文件路径:s3n://dsn/to/my/bucket

仔细阅读pyarrow文档后,目前似乎不可能实现。因此,我提出了以下解决方案:

从S3中读取单个文件并获取pandas数据帧:

import io
import boto3
import pyarrow.parquet as pq

buffer = io.BytesIO()
s3 = boto3.resource('s3')
s3_object = s3.Object('bucket-name', 'key/to/parquet/file.gz.parquet')
s3_object.download_fileobj(buffer)
table = pq.read_table(buffer)
df = table.to_pandas()

这是我的笨拙、不够优化的解决方案,通过S3文件夹路径创建pandas数据框:

import io
import boto3
import pandas as pd
import pyarrow.parquet as pq

bucket_name = 'bucket-name'
def download_s3_parquet_file(s3, bucket, key):
    buffer = io.BytesIO()
    s3.Object(bucket, key).download_fileobj(buffer)
    return buffer

client = boto3.client('s3')
s3 = boto3.resource('s3')
objects_dict = client.list_objects_v2(Bucket=bucket_name, Prefix='my/folder/prefix')
s3_keys = [item['Key'] for item in objects_dict['Contents'] if item['Key'].endswith('.parquet')]
buffers = [download_s3_parquet_file(s3, bucket_name, key) for key in s3_keys]
dfs = [pq.read_table(buffer).to_pandas() for buffer in buffers]
df = pd.concat(dfs, ignore_index=True)

有没有更好的方法可以实现这个?也许可以使用 pyarrow 连接 pandas 的某种方式?我想避免使用 pyspark,但如果没有其他解决方案,我会使用它。


1
你考虑过用dask来读取它们吗?我可以用两行代码完成相同的操作。 - rpanai
只有在筛选文件大小> 0的文件后,才能使您的hacky解决方案正常工作: dfs = [pq.read_table(buffer).to_pandas() for buffer in buffers if len(buffer.getvalue()) > 0]。谢谢!不幸的是,这里的答案并没有完全解决这个问题。 - Wassadamo
9个回答

74

你应该使用由yjk21建议的s3fs模块。但是当调用ParquetDataset后,您将获得一个pyarrow.parquet.ParquetDataset对象。要获取Pandas DataFrame,您将需要应用.read_pandas().to_pandas()

import pyarrow.parquet as pq
import s3fs
s3 = s3fs.S3FileSystem()

pandas_dataframe = pq.ParquetDataset('s3://your-bucket/', filesystem=s3).read_pandas().to_pandas()

出现错误:ValueError:在中间目录中发现文件。有什么想法吗? - Mithril
3
你这样做会发生什么?它会流式传输还是复制到本地?如果我打开一个10GB的文件两次,它会下载两次吗?如果我只有5GB的本地存储空间,打开一个10GB的文件会发生什么,它会流式传输还是全部下载? - citynorman
2
当我指定包含所有parquet文件的键时,我会收到“ArrowIOError:无效的Parquet文件大小为0字节”的错误。当我明确指定parquet文件时,它可以正常工作。我使用s3fs == 0.3.5和pyarrow == 0.15.0。@vak,你有什么想法,为什么我不能像你一样读取s3键中的所有parquet文件? - Vincent Claes
@VincentClaes 后面加斜杠? - vak
可以使用pq.ParquetFile()在AWS S3中读取parquet文件吗?如果可以,与使用pq.ParquetDataset()相比,使用它的优缺点是什么? - undefined
显示剩余3条评论

39

谢谢!你的问题实际上告诉了我很多东西。我现在是这样使用 pandas (0.21.1)的,它会调用 pyarrowboto3(1.3.1)。

import boto3
import io
import pandas as pd

# Read single parquet file from S3
def pd_read_s3_parquet(key, bucket, s3_client=None, **args):
    if s3_client is None:
        s3_client = boto3.client('s3')
    obj = s3_client.get_object(Bucket=bucket, Key=key)
    return pd.read_parquet(io.BytesIO(obj['Body'].read()), **args)

# Read multiple parquets from a folder on S3 generated by spark
def pd_read_s3_multiple_parquets(filepath, bucket, s3=None, 
                                 s3_client=None, verbose=False, **args):
    if not filepath.endswith('/'):
        filepath = filepath + '/'  # Add '/' to the end
    if s3_client is None:
        s3_client = boto3.client('s3')
    if s3 is None:
        s3 = boto3.resource('s3')
    s3_keys = [item.key for item in s3.Bucket(bucket).objects.filter(Prefix=filepath)
               if item.key.endswith('.parquet')]
    if not s3_keys:
        print('No parquet found in', bucket, filepath)
    elif verbose:
        print('Load parquets:')
        for p in s3_keys: 
            print(p)
    dfs = [pd_read_s3_parquet(key, bucket=bucket, s3_client=s3_client, **args) 
           for key in s3_keys]
    return pd.concat(dfs, ignore_index=True)

那么您可以通过以下方法从S3中读取文件夹下的多个parquets:

df = pd_read_s3_multiple_parquets('path/to/folder', 'my_bucket')

我想这段代码可以大大简化。


15

使用boto3也可以完成,而无需使用pyarrow

import boto3
import io
import pandas as pd

# Read the parquet file
buffer = io.BytesIO()
s3 = boto3.resource('s3')
object = s3.Object('bucket_name','key')
object.download_fileobj(buffer)
df = pd.read_parquet(buffer)

print(df.head())

2
我得到了AttributeError: 's3.Object' object has no attribute 'download_fileobj' - Louis Yang
1
我认为张贴者的意思是“不使用s3fs”,无论如何,如果您需要BytesIO缓冲区,则这是一个很好的答案。 - Brian Wylie

14

只要您设置了正确的软件包

$ pip install pandas==1.1.0 pyarrow==1.0.0 s3fs==0.4.2

并且您的AWS共享配置和凭证文件已被正确配置

您可以立即使用pandas

import pandas as pd

df = pd.read_parquet("s3://bucket/key.parquet")
在使用多个 AWS 配置文件时,您可能还需要进行设置。
$ export AWS_DEFAULT_PROFILE=profile_under_which_the_bucket_is_accessible

这样你就可以访问你的存储桶。


3
谢谢!这绝对应该是被接受的答案。 - Nano Tellez
新版的Pandas现在可以直接读取s3路径了。虽然不知为何,当我尝试时速度比使用io.BytesIO方法要慢很多。 - Louis Yang
Install s3fs to access S3 - jtlz2
无法将此与Poetry一起使用,s3fs对botocore的依赖项已固定。 - eljusticiero67

12

这个解决方案唯一的问题是,如果需要在集群上分发,就无法实现。 - rpanai
我的问题是我得到一个错误,提示文件太多。可能是因为dask无法处理它,因此需要采用另一种解决方案吗? - msarafzadeh
我的parquet文件目录中有0字节的_SUCCESS头文件。尝试在glob中排除它们:dd.read_parquet('s3://bucket/test.parquet/[!_]*'),但结果是IndexError: list index out of range。可能是因为没有匹配项或一些0大小的文件。 - Wassadamo
pip install dask[dataframe] - jtlz2

9

如果你也愿意使用 AWS Data Wrangler

import awswrangler as wr

df = wr.s3.read_parquet(path="s3://...")

我不知道为什么,但读取数据需要很长时间吗?例如,对于仅有1.3 MB的数据集,需要6秒钟的时间? - kanav anand
6
您的示例将期望一个Parquet文件。您需要设置参数 dataset = True 以读取Parquet文件列表。 - Vincent Claes
由于某种原因,我遇到了超时错误。 df = wr.s3.read_parquet(path="s3://......../", path_suffix = ".snappy.parquet", dataset=True) "errorMessage": "................................ 任务在3.14秒后超时"有任何建议... - NNM

4
您可以使用Dask中的s3fs实现与S3的文件系统接口。然后,您可以像这样使用ParquetDataset的filesystem参数:
import s3fs
s3 = s3fs.S3FileSystem()
dataset = pq.ParquetDataset('s3n://dsn/to/my/bucket', filesystem=s3)

1

使用预签名URL

s3 =s3fs.S3FileSystem(key='your_key',secret='your_secret',client_kwargs={"endpoint_url":'your_end_point'})

df = dd.read_parquet(s3.url('your_bucket' + 'your_filepath',expires=3600,client_method='get_object'))

0

我尝试了@oya163的解决方案,它有效,但稍作更改后

import boto3
import io
import pandas as pd

# Read the parquet file
buffer = io.BytesIO()
s3 = boto3.resource('s3',aws_access_key_id='123',aws_secret_access_key= '456')
object = s3.Object('bucket_name','myoutput.parquet')
object.download_fileobj(buffer)
df = pd.read_parquet(buffer)

print(df.head())   

目前你的回答不够清晰,请编辑并添加更多细节,以帮助其他人理解它如何回答问题。你可以在帮助中心找到有关如何编写好答案的更多信息。 - Community

网页内容由stack overflow 提供, 点击上面的
可以查看英文原文,
原文链接