使用谓词从pyarrow.parquet.ParquetDataset中筛选行

28

我有一个存储在s3上的镶木地板数据集,我想从数据集中查询特定的行。我能够使用petastorm做到这一点,但现在我想仅使用pyarrow来完成。

以下是我的尝试:

import pyarrow.parquet as pq
import s3fs

fs = s3fs.S3FileSystem()

dataset = pq.ParquetDataset(
    'analytics.xxx', 
    filesystem=fs, 
    validate_schema=False, 
    filters=[('event_name', '=', 'SomeEvent')]
)

df = dataset.read_pandas().to_pandas()

但是它返回的pandas DataFrame好像过滤器没有起作用,也就是说我有各种不同event_name值的行。我是否遗漏了什么或者理解有误?我可以在获取pandas DataFrame后进行筛选,但会使用比实际需要更多的内存空间。

4个回答

29

注意:我已将此扩展为有关Python和Parquet的综合指南,您可以在此帖子中了解更多。

Parquet格式分区

为了使用过滤器,您需要使用分区将数据存储在Parquet格式中。与CSV相比,仅加载少量的Parquet列和分区可能会对I/O性能产生巨大的改进。Parquet可以基于一个或多个字段的值对文件进行分区,并为嵌套值的唯一组合创建目录树,或者只为一个分区列创建一个目录集合。 PySpark Parquet文档很好地解释了Parquet的工作原理。

根据性别和国家进行的分区将如下所示(链接)

path
└── to
    └── table
        ├── gender=male
        │   ├── ...
        │   │
        │   ├── country=US
        │   │   └── data.parquet
        │   ├── country=CN
        │   │   └── data.parquet
        │   └── ...

如果需要进一步对数据进行分区,还可以使用行组分区,但大多数工具仅支持指定行组大小,您必须自己执行 key-->row group 查找,这很麻烦(我很乐意在另一个问题中回答有关此问题的问题)。

使用Pandas编写分区数据

您需要使用Parquet对数据进行分区,然后可以使用过滤器加载数据。您可以使用PyArrow、pandas或DaskPySpark来编写分区数据以处理大型数据集。

例如,使用pandas编写分区数据:

df.to_parquet(
    path='analytics.xxx', 
    engine='pyarrow',
    compression='snappy',
    columns=['col1', 'col5'],
    partition_cols=['event_name', 'event_category']
)

这将文件布置如下:

analytics.xxx/event_name=SomeEvent/event_category=SomeCategory/part-0001.c000.snappy.parquet
analytics.xxx/event_name=SomeEvent/event_category=OtherCategory/part-0001.c000.snappy.parquet
analytics.xxx/event_name=OtherEvent/event_category=SomeCategory/part-0001.c000.snappy.parquet
analytics.xxx/event_name=OtherEvent/event_category=OtherCategory/part-0001.c000.snappy.parquet

在PyArrow中加载Parquet分区

要使用分区列按一个属性获取事件,您可以将一个元组过滤器放入到列表中:

import pyarrow.parquet as pq
import s3fs

fs = s3fs.S3FileSystem()

dataset = pq.ParquetDataset(
    's3://analytics.xxx', 
    filesystem=fs, 
    validate_schema=False, 
    filters=[('event_name', '=', 'SomeEvent')]
)
df = dataset.to_table(
    columns=['col1', 'col5']
).to_pandas()

使用逻辑与(AND)进行过滤

如果要使用逻辑与(AND)抓取具有两个或更多属性的事件,只需创建一个过滤器元组列表:

import pyarrow.parquet as pq
import s3fs

fs = s3fs.S3FileSystem()

dataset = pq.ParquetDataset(
    's3://analytics.xxx', 
    filesystem=fs, 
    validate_schema=False, 
    filters=[
        ('event_name',     '=', 'SomeEvent'),
        ('event_category', '=', 'SomeCategory')
    ]
)
df = dataset.to_table(
    columns=['col1', 'col5']
).to_pandas()

使用逻辑OR进行筛选

要使用OR抓取两个事件,您需要将过滤器元组嵌套在它们自己的列表中:

import pyarrow.parquet as pq
import s3fs

fs = s3fs.S3FileSystem()

dataset = pq.ParquetDataset(
    's3://analytics.xxx', 
    filesystem=fs, 
    validate_schema=False, 
    filters=[
        [('event_name', '=', 'SomeEvent')],
        [('event_name', '=', 'OtherEvent')]
    ]
)
df = dataset.to_table(
    columns=['col1', 'col5']
).to_pandas()

使用AWS Data Wrangler加载Parquet分区

正如其他答案所提到的,加载数据并仅对特定分区中的某些列进行过滤(无论数据存储在本地还是云端)最简单的方法是使用awswrangler模块。如果您使用S3,请查阅awswrangler.s3.read_parquet()awswrangler.s3.to_parquet()的文档。过滤操作与上述示例相同。

import awswrangler as wr

df = wr.s3.read_parquet(
    path="analytics.xxx",
    columns=["event_name"], 
    filters=[('event_name', '=', 'SomeEvent')]
)

使用pyarrow.parquet.read_table()加载Parquet分区

如果您正在使用PyArrow,您也可以使用pyarrow.parquet.read_table()

import pyarrow.parquet as pq

fp = pq.read_table(
    source='analytics.xxx',
    use_threads=True,
    columns=['some_event', 'some_category'],
    filters=[('event_name', '=', 'SomeEvent')]
)
df = fp.to_pandas()

使用 PySpark 加载 Parquet 分区

最后,在 PySpark 中,您可以使用 pyspark.sql.DataFrameReader.read_parquet() 方法来加载 Parquet 分区。

import pyspark.sql.functions as F
from pyspark.sql import SparkSession

spark = SparkSession.builder.master("local[1]") \
                    .appName('Stack Overflow Example Parquet Column Load') \
                    .getOrCreate()

# I automagically employ Parquet structure to load the selected columns and partitions
df = spark.read.parquet('s3://analytics.xxx') \
          .select('event_name', 'event_category') \
          .filter(F.col('event_name') == 'SomeEvent')

希望这可以帮助您使用Parquet :)


Medium文章的链接已损坏。 - Jason S
1
@JasonS 已解决:https://blog.datasyndrome.com/python-and-parquet-performance-e71da65269ce - rjurney

18

对于从Google搜索到这里的任何人,现在您可以在使用PyArrow读取Parquet文件时按行进行过滤,无论是通过pandas还是pyarrow.parquet。

根据文档

filters(List[Tuple]或List[List[Tuple]]或None(默认)) - 不符合过滤谓词的行将从扫描的数据中删除。 嵌套目录结构中嵌入的分区键将被利用,以避免加载不包含匹配行的文件。 如果use_legacy_dataset为True,则过滤器只能引用分区键,并且仅支持类似hive的目录结构。 当将use_legacy_dataset设置为False时,还支持文件内级别的过滤和不同的分区方案。

谓词采用析取范式(DNF)表示,例如[[('x','=',0),... ],...]。 DNF允许单列谓词的任意布尔逻辑组合。 最内部元组各描述一个单列谓词。 内部谓词列表被解释为合取(AND),形成更具选择性和多列谓词。 最后,最外部的列表将这些过滤器组合为一个析取(OR)。

谓词也可以被传递为List[Tuple]的形式。这种形式被解释为单个连词。要在谓词中表示“OR”,必须使用(首选)List[List[Tuple]]表示法。


5

目前,filters功能只在文件级别上实现,而不是行级别。

因此,如果您有一个作为多个分区parquet文件集合的数据集(这里描述了分区数据集的类型:https://arrow.apache.org/docs/python/parquet.html#partitioned-datasets-multiple-files),您可以使用filters参数仅读取文件的子集。
但是,您还无法仅使用该功能读取单个文件的子集行组(请参见https://issues.apache.org/jira/browse/ARROW-1796)。

但是,如果指定了无效的过滤器,将会收到错误消息,这应该是很好的。我为此提出了问题:https://issues.apache.org/jira/browse/ARROW-5572


好的,明白了!我应该更多地考虑如何组织数据,以便能够进行更有效的查询。是的,有一个错误信息确实非常不错,感谢汇报。 - kluu
1
嗨,你提出的问题似乎已经解决了...但我测试了代码,它仍然没有抛出错误...https://issues.apache.org/jira/browse/ARROW-5572 - Praveen Kulkarni

5

对于Python 3.6+,AWS有一个名为aws-data-wrangler的库,它有助于Pandas / S3 / Parquet之间的集成,并允许您在分区S3密钥上进行过滤。

要安装,请执行以下操作;

pip install awswrangler

为了减少读取的数据量,您可以基于Parquet文件上S3中分区列过滤行。要过滤分区列event_name值为"SomeEvent"的行,请执行以下操作;

对于awswrangler < 1.0.0

import awswrangler as wr

df = wr.pandas.read_parquet(
         path="s3://my-bucket/my/path/to/parquet-file.parquet",
         columns=["event_name"], 
         filters=[('event_name', '=', 'SomeEvent')]
)

for awswrangler > 1.0.0 do;

import awswrangler as wr

df = wr.s3.read_parquet(
         path="s3://my-bucket/my/path/to/parquet-file.parquet",
         columns=["event_name"], 
         filters=[('event_name', '=', 'SomeEvent')]
)

我刚尝试使用这个示例,但是出现了以下错误: AttributeError: 'NoneType' object has no attribute 'filter_accepts_partition' - irwinr
看起来有一个 bug;https://github.com/awslabs/aws-data-wrangler/issues/267 - Vincent Claes
1
这正是我担心的。实际上,那是我提交的错误报告。 :) - irwinr
你可以尝试使用旧版本,看看它会带来什么? - Vincent Claes
2
看起来这并不是真正的错误。对我的错误报告的回应是:“不幸的是,AWS Data Wrangler不支持在物理列上进行过滤,只支持在分区上进行过滤。(文档已在上述提交中更新)。这不仅仅是传递use_legacy_dataset=False的问题,似乎新的数据集方法也不支持接收boto3会话。”也许将这个答案编辑一下,强调过滤只能在分区上工作,而不能在物理列上工作,这可能是一个好主意? - irwinr

网页内容由stack overflow 提供, 点击上面的
可以查看英文原文,
原文链接