注意:我已将此扩展为有关Python和Parquet的综合指南,您可以在此帖子中了解更多。
Parquet格式分区
为了使用过滤器,您需要使用分区将数据存储在Parquet格式中。与CSV相比,仅加载少量的Parquet列和分区可能会对I/O性能产生巨大的改进。Parquet可以基于一个或多个字段的值对文件进行分区,并为嵌套值的唯一组合创建目录树,或者只为一个分区列创建一个目录集合。 PySpark Parquet文档很好地解释了Parquet的工作原理。
根据性别和国家进行的分区将如下所示(链接):
path
└── to
└── table
├── gender=male
│ ├── ...
│ │
│ ├── country=US
│ │ └── data.parquet
│ ├── country=CN
│ │ └── data.parquet
│ └── ...
如果需要进一步对数据进行分区,还可以使用行组分区,但大多数工具仅支持指定行组大小,您必须自己执行 key-->row group
查找,这很麻烦(我很乐意在另一个问题中回答有关此问题的问题)。
使用Pandas编写分区数据
您需要使用Parquet对数据进行分区,然后可以使用过滤器加载数据。您可以使用PyArrow、pandas或Dask或PySpark来编写分区数据以处理大型数据集。
例如,使用pandas编写分区数据:
df.to_parquet(
path='analytics.xxx',
engine='pyarrow',
compression='snappy',
columns=['col1', 'col5'],
partition_cols=['event_name', 'event_category']
)
这将文件布置如下:
analytics.xxx/event_name=SomeEvent/event_category=SomeCategory/part-0001.c000.snappy.parquet
analytics.xxx/event_name=SomeEvent/event_category=OtherCategory/part-0001.c000.snappy.parquet
analytics.xxx/event_name=OtherEvent/event_category=SomeCategory/part-0001.c000.snappy.parquet
analytics.xxx/event_name=OtherEvent/event_category=OtherCategory/part-0001.c000.snappy.parquet
在PyArrow中加载Parquet分区
要使用分区列按一个属性获取事件,您可以将一个元组过滤器放入到列表中:
import pyarrow.parquet as pq
import s3fs
fs = s3fs.S3FileSystem()
dataset = pq.ParquetDataset(
's3://analytics.xxx',
filesystem=fs,
validate_schema=False,
filters=[('event_name', '=', 'SomeEvent')]
)
df = dataset.to_table(
columns=['col1', 'col5']
).to_pandas()
使用逻辑与(AND)进行过滤
如果要使用逻辑与(AND)抓取具有两个或更多属性的事件,只需创建一个过滤器元组列表:
import pyarrow.parquet as pq
import s3fs
fs = s3fs.S3FileSystem()
dataset = pq.ParquetDataset(
's3://analytics.xxx',
filesystem=fs,
validate_schema=False,
filters=[
('event_name', '=', 'SomeEvent'),
('event_category', '=', 'SomeCategory')
]
)
df = dataset.to_table(
columns=['col1', 'col5']
).to_pandas()
使用逻辑OR进行筛选
要使用OR抓取两个事件,您需要将过滤器元组嵌套在它们自己的列表中:
import pyarrow.parquet as pq
import s3fs
fs = s3fs.S3FileSystem()
dataset = pq.ParquetDataset(
's3://analytics.xxx',
filesystem=fs,
validate_schema=False,
filters=[
[('event_name', '=', 'SomeEvent')],
[('event_name', '=', 'OtherEvent')]
]
)
df = dataset.to_table(
columns=['col1', 'col5']
).to_pandas()
使用AWS Data Wrangler加载Parquet分区
正如其他答案所提到的,加载数据并仅对特定分区中的某些列进行过滤(无论数据存储在本地还是云端)最简单的方法是使用awswrangler
模块。如果您使用S3,请查阅awswrangler.s3.read_parquet()
和awswrangler.s3.to_parquet()
的文档。过滤操作与上述示例相同。
import awswrangler as wr
df = wr.s3.read_parquet(
path="analytics.xxx",
columns=["event_name"],
filters=[('event_name', '=', 'SomeEvent')]
)
使用pyarrow.parquet.read_table()
加载Parquet分区
如果您正在使用PyArrow,您也可以使用pyarrow.parquet.read_table()
:
import pyarrow.parquet as pq
fp = pq.read_table(
source='analytics.xxx',
use_threads=True,
columns=['some_event', 'some_category'],
filters=[('event_name', '=', 'SomeEvent')]
)
df = fp.to_pandas()
使用 PySpark 加载 Parquet 分区
最后,在 PySpark 中,您可以使用 pyspark.sql.DataFrameReader.read_parquet()
方法来加载 Parquet 分区。
import pyspark.sql.functions as F
from pyspark.sql import SparkSession
spark = SparkSession.builder.master("local[1]") \
.appName('Stack Overflow Example Parquet Column Load') \
.getOrCreate()
df = spark.read.parquet('s3://analytics.xxx') \
.select('event_name', 'event_category') \
.filter(F.col('event_name') == 'SomeEvent')
希望这可以帮助您使用Parquet :)