我有一种hacky的方法可以使用boto3
(1.4.4)、pyarrow
(0.4.1)和pandas
(0.20.3)来实现。
首先,我可以像这样本地读取单个parquet文件:
import pyarrow.parquet as pq
path = 'parquet/part-r-00000-1e638be4-e31f-498a-a359-47d017a0059c.gz.parquet'
table = pq.read_table(path)
df = table.to_pandas()
我也可以像这样本地读取一个parquet文件目录:
import pyarrow.parquet as pq
dataset = pq.ParquetDataset('parquet/')
table = dataset.read()
df = table.to_pandas()
两者都非常好用。现在我想通过远程方式使用存储在S3存储桶中的文件实现相同的效果。我希望像这样的东西能够起作用:
dataset = pq.ParquetDataset('s3n://dsn/to/my/bucket')
但是它却不行:
OSError: 传递的不是文件路径:s3n://dsn/to/my/bucket
仔细阅读pyarrow文档后,目前似乎不可能实现。因此,我提出了以下解决方案:
从S3中读取单个文件并获取pandas数据帧:
import io
import boto3
import pyarrow.parquet as pq
buffer = io.BytesIO()
s3 = boto3.resource('s3')
s3_object = s3.Object('bucket-name', 'key/to/parquet/file.gz.parquet')
s3_object.download_fileobj(buffer)
table = pq.read_table(buffer)
df = table.to_pandas()
这是我的笨拙、不够优化的解决方案,通过S3文件夹路径创建pandas数据框:
import io
import boto3
import pandas as pd
import pyarrow.parquet as pq
bucket_name = 'bucket-name'
def download_s3_parquet_file(s3, bucket, key):
buffer = io.BytesIO()
s3.Object(bucket, key).download_fileobj(buffer)
return buffer
client = boto3.client('s3')
s3 = boto3.resource('s3')
objects_dict = client.list_objects_v2(Bucket=bucket_name, Prefix='my/folder/prefix')
s3_keys = [item['Key'] for item in objects_dict['Contents'] if item['Key'].endswith('.parquet')]
buffers = [download_s3_parquet_file(s3, bucket_name, key) for key in s3_keys]
dfs = [pq.read_table(buffer).to_pandas() for buffer in buffers]
df = pd.concat(dfs, ignore_index=True)
有没有更好的方法可以实现这个?也许可以使用 pyarrow 连接 pandas 的某种方式?我想避免使用 pyspark
,但如果没有其他解决方案,我会使用它。
dfs = [pq.read_table(buffer).to_pandas() for buffer in buffers if len(buffer.getvalue()) > 0]
。谢谢!不幸的是,这里的答案并没有完全解决这个问题。 - Wassadamo