不读入内存的情况下读写 Parquet 文件(使用 Python)。

7

我查看了标准文档,我期望找到符合我需求的内容(Apache ArrowPandas),但似乎无法解决问题。

我最熟悉Python,因此希望使用Python,但这不是必须的要求。

问题

我需要将 Parquet 文件从一个位置(URL)移动到另一个位置(Azure 存储帐户,在此情况下使用 Azure 机器学习平台,但这与我的问题无关)。

由于这些文件太大,不能仅仅执行 pd.read_parquet("https://my-file-location.parquet"),因为这会将整个文件读取到对象中。

期望

我认为应该有一种简单的方法来创建一个文件对象,并按行流式传输该对象 - 或者可能是按列块进行传输。就像这样:

import pyarrow.parquet as pq

with pq.open("https://my-file-location.parquet") as read_file_handle:
    with pq.open("https://my-azure-storage-account/my-file.parquet", "write") as write_filehandle:
        for next_line in read_file_handle{
            write_file_handle.append(next_line)

我知道这会与主要用于以列方式访问Parquet的方式略有不同。也许有一种配置对象,我需要传递它来指定感兴趣的列,或者可以在一个块中获取多少行或类似的内容。

但是关键的期望是:有一种方法可以访问Parquet文件而无需将其全部加载到内存中。我该如何做到这一点?

顺便说一下,我尝试过只使用Python标准的open函数,但我不确定如何在URL位置和字节流中使用open。如果可能仅通过open跳过任何特定于Parquet的内容来完成此操作,那也可以。

更新

一些评论建议使用类似于bash的脚本,例如这里。如果没有其他选择,我可以使用这个方法,但这并不理想,因为:

  • 我更喜欢将所有内容都保留在完整的语言SDK中,无论是Python、Go还是任何其他语言。如果解决方案转移到具有管道的bash脚本中,由于最终解决方案不会完全编写bash、Powershell或任何脚本语言,因此需要进行外部调用。
  • 我真的想利用Parquet本身的一些优点。如我在下面的评论中所述,Parquet是列存储。因此,如果我有一个包含11亿行和100列的“数据框”,但我只关心3列,则希望能够仅下载这3列,从而节省大量时间和一些金钱。

这个回答解决了你的问题吗?将互联网上的文件下载到S3存储桶中 - Be Chiller Too
如果您想要进行即时处理,那么这将显著改变计算方式;将其视为纯二进制可能会更快,甚至可能快得多,但仅适用于没有处理的直接复制。 - Jiří Baum
@MikeWilliamson,你解决了吗?我也遇到了类似的问题,但是发现ParquetFile不接受URL作为源,所以有什么建议吗? - av abhishiek
@avabhishiek,不,我从未解决过它。相反,我绕过了它。我不记得是因为那是一段时间以前的事情,但我认为我的玩具没有完全捕捉到这个问题:我不仅仅是读取和立即写入。还有一些过滤或其他操作。如果我没记错的话,我使用了一个UDF来处理它。UDF在幕后使用pandas,并且它们以块的形式获取数据。很抱歉我记不起更多了。 - Mike Williamson
显示剩余2条评论
4个回答

9

很棒的文章,基于 @Micah 的回答,我也加入了我的看法,以防您不想阅读文档。以下是一个小片段:

import pandas as pd
import numpy as np
from pyarrow.parquet import ParquetFile

# create a random df then save to parquet
df = pd.DataFrame({
    'A': np.arange(10000),
    'B': np.arange(10000),
    'C': np.arange(10000),
    'D': np.arange(10000),
})
df.to_parquet('./test/test')

# ****** below is @Micah Kornfield's answer ******
# 1. open parquet file
batch = ParquetFile('./test/test')
# 2. define generator of batches
record = batch.iter_batches(
    batch_size=10,
    columns=['B', 'C'],
)
# 3. yield pandas/numpy data
print(next(record).to_pandas()) # pandas
print(next(record).to_pydict()) # native python dict

这个脚本如果./test目录不存在,会崩溃。我建议改用./test.parquet作为目标路径。 - mb7744

5
这是可能的,但需要一些工作,因为除了列式存储之外,Parquet还需要一个模式。
大致的工作流程如下:
  1. 打开一个parquet文件以供阅读。

  2. 然后使用iter_batches逐步读取行块(您还可以传递要从文件中读取的特定列以节省IO/CPU)。

  3. 然后,您可以进一步转换来自iter_batches的每个pa.RecordBatch。完成第一个批次的转换后,您可以获取其schema并创建新的ParquetWriter

  4. 对于每个转换的批次,请调用write_table。您必须首先将其转换为pa.Table

  5. 关闭文件。

Parquet需要随机访问,因此无法轻松地从URI进行流式传输(如果通过HTTP FSSpec打开文件,则pyarrow应支持它),但我认为您可能会在写入方面受阻。

2
批处理大小对于管理内存非常重要。另请参阅https://dev59.com/-r3pa4cB1Zd3GeqPaT3P#64469365 - Micah Kornfield
1
谢谢,@Micah!是的,我知道这会很棘手,需要一个模式。我发现在SO上有时候写较短的问题比较长的问题更好,如果我不是真的知道自己在问什么的话。;)无论如何,iter_batches正是我要找的东西。我感到有点傻没有看到它。我会努力尝试把它放在正确的位置。 - Mike Williamson
怎样关闭文件?在源代码中,ParquetFile类中有close()方法,但是它不起作用。pq_file = pq.ParquetFile(path)然后for batch in pq_file: break然后pq_file.close()但是不起作用。 - undefined

2
无需将整个文件读入内存即可进行Parquet到Parquet的转换。
在本主题的已接受 答案 的基础上。
import pyarrow as pa
import pyarrow.parquet as pq

parquet_file = pq.ParquetFile('read.parquet')
new_schema = pa.schema([
    ('a', pa.int32()),
    ('b', pa.int32()),
    ('c', pa.int32()),
])
## get arrow schema from parquet file instead of hard coding it.
#arrow_schema = parquet_file.schema_arrow 
with pq.ParquetWriter('write.parquet', schema=new_schema) as writer:
    # iter_batches lets you filter by certain columns or certain row groups as well
    for batch in parquet_file.iter_batches(batch_size=100000):
        df = batch.to_pandas()
        # transformation: transform df by adding a new static column with column name c and value 9999999
        df['c'] = 9999999
        # convert pandas df to record batch
        # schema will be inferred if not provided 
        transformed_batch = pa.RecordBatch.from_pandas(df, schema=new_schema)
        writer.write_batch(transformed_batch)

所有文档都在被接受的答案中链接了。

enter image description here

注意:不要将批处理大小设置得太低。这会导致压缩效果差,因为批处理大小也对新的Parquet文件中的行组大小产生影响。
CSV转换为Parquet而无需读取整个文件到内存中,请查看此答案- https://dev59.com/FVUL5IYBdhLWcg3wQWOD#74258957

0
请注意,我没有指定如何在远程服务器端使用批处理的实现方式。
我的解决方案是:使用pyarrow.NativeFile将批处理写入缓冲区,然后使用pyarrow.ipc.RecordBatchFileReader读取缓冲区。
我创建了这两个类来帮助您进行流式处理。
import asyncio
from pyarrow.parquet import ParquetFile


class ParquetFileStreamer:
    """
    Attributes:
        ip_address: ip address of the distant server
        port: listening port of the distant server
        n_bytes: -1 means read whole batch
        file_source: pathlib.Path, pyarrow.NativeFile, or file-like object
        batch_size: default = 65536
        columns: list of the columns you wish to select (if None selects all)

    Example:
        >>> pfs = ParquetFileStreamer
        >>> class MyStreamer(ParquetFileStreamer)
                file_source = '/usr/fromage/camembert.parquet
                columns = ['name', 'price']
        >>> MyStreamer.start_stream()
    """
    ip_address = '192.168.1.1'
    port = 80
    n_bytes = -1

    file_source: str
    batch_size = 65536
    columns = []

    @classmethod
    def start_stream(cls):
        for batch in cls._open_parquet():
            asyncio.run(cls._stream_parquet(batch))

    @classmethod
    def _open_parquet(cls):
        return ParquetFile(cls.file_source).iter_batches(
            batch_size=cls.batch_size,
            columns=cls.columns
        )

    @classmethod
    async def _stream_parquet(cls, batch):
        reader, writer = await asyncio.open_connection(cls.ip_address, cls.port)
        writer.write(batch)
        await writer.drain()
        await reader.read()
        writer.close()
        await writer.wait_closed()


class ParquetFileReceiver:
    """
    Attributes: \n
        port: specify the port \n
        n_bytes: -1 reads all the batch
    Example:
        >>> pfr = ParquetFileReceiver
        >>> asyncio.run(pfr.server())
    """
    port = 80
    n_bytes = -1

    @classmethod
    async def handle_stream(cls, reader, writer):
        data = await reader.read(cls.n_bytes)
        batch = data.decode()
        print(batch)

    @classmethod
    async def server(cls):
        server = await asyncio.start_server(cls.handle_stream, port=cls.port)
        async with server:
            await server.serve_forever()

网页内容由stack overflow 提供, 点击上面的
可以查看英文原文,
原文链接