以 Arrow 格式快速编写 NumPy 数组的最佳方法

11

我正在寻找使用pyarrow快速存储和检索numpy数组的方法。对于检索,我感到非常满意。从包含1,000,000,000个dtype = np.uint16整数的.arrow文件中提取列只需要不到1秒钟。

import pyarrow as pa
import numpy as np

def write(arr, name):
    arrays = [pa.array(col) for col in arr]
    names = [str(i) for i in range(len(arrays))]
    batch = pa.RecordBatch.from_arrays(arrays, names=names)
    with pa.OSFile(name, 'wb') as sink:
        with pa.RecordBatchStreamWriter(sink, batch.schema) as writer:
            writer.write_batch(batch)

def read(name):
    source = pa.memory_map(name, 'r')
    table = pa.ipc.RecordBatchStreamReader(source).read_all()
    for i in range(table.num_columns):
        yield table.column(str(i)).to_numpy()

arr = np.random.randint(65535, size=(250, 4000000), dtype=np.uint16)

%%timeit -r 1 -n 1
write(arr, 'test.arrow')
>>> 25.6 s ± 0 ns per loop (mean ± std. dev. of 1 run, 1 loop each)

%%timeit -r 1 -n 1
for n in read('test.arrow'): n
>>> 901 ms ± 0 ns per loop (mean ± std. dev. of 1 run, 1 loop each)

能否提高将数据写入.arrow格式的效率?此外,我还测试了np.save

%%timeit -r 1 -n 1
np.save('test.npy', arr)
>>> 18.5 s ± 0 ns per loop (mean ± std. dev. of 1 run, 1 loop each)

看起来速度有点快。我们可以进一步优化Apache Arrow,以更好地将数据写入.arrow格式吗?


3
如果你在使用LZ4压缩时遇到了IO瓶颈,那么将其作为写入选项可能会很有价值。但是这会降低读取速度,因为数据不再是零拷贝的。如果你有多列数据,Arrow应该可以并行压缩它们,所以差别可能不会太大。 - Micah Kornfield
np.random.randint() 返回生成器或类似的惰性结构吗?您是否计时随机数生成以及写入操作?(当我使用pandas写入parquet文件时,即使在HDD上也比这快得多。) - MatBailie
@MatBailie 不,这不是懒惰。我正在计时代码中显示的“写入”和“读取”。我真的很想知道我的测试有什么问题?我也在Google Colab上进行了测试,写入用了16秒,读取用了24毫秒。我还将尝试不同的方法。 - mathfux
2个回答

9

也许性能问题主要是由于IO/磁盘速度造成的。在这种情况下,你没有太多可以改进的。

我在我的设备上运行了一些测试。得到的数字与你的不同。但底线是相同的,写入比读取慢。

生成的文件大小为1.9GB(2000023184字节):

$ ls test.arrow -l
-rw-rw-r-- 1 0x26res 0x26res 2000023184 Nov 15 10:01 test.arrow

在下面的代码中,我生成了1.9 GB的随机字节并将它们保存下来,然后与使用arrow保存所需的时间进行比较:
import secrets

data = b"\x00" + secrets.token_bytes(2000023184)  + b"\x00"

def write_bytes(data, name):
    with open(name, 'wb') as fp:
        fp.write(data)

%%timeit -r 1 -n 1 write_bytes(data, 'test.bytes')
>>> 2.29 s ± 0 ns per loop (mean ± std. dev. of 1 run, 1 loop each)

%%timeit -r 1 -n 1 write(arr, 'test.arrow')
>>> 2.52 s ± 0 ns per loop (mean ± std. dev. of 1 run, 1 loop each)

在我的设备上,使用arrow写入数据需要2.52秒。如果我尝试写入相同数量的随机字节,则需要2.29秒。这意味着arrow的开销占写入时间的约10%,因此无法做太多事情来加速它。


你说得对。这确实是我的RAM/IO/磁盘速度的某种问题。2.5秒真的很好。希望在pyarrow中有一些解决方法。 - mathfux

1

看起来我的RAM/IO/disk有某种限制。非常安静的限制......当arr超过200M项目时,它会将我的写作速度减慢3-8倍,这就是为什么我从2.5秒的速度下降到20秒的原因。我很高兴知道是否可以在pyarrow中解决这个问题。

def pyarrow_write_arrow_Batch(arr, name):
    arrays = [pa.array(col) for col in arr]
    names = [str(i) for i in range(len(arrays))]
    batch = pa.RecordBatch.from_arrays(arrays, names=names)
    with pa.OSFile(name, 'wb') as sink:
        with pa.RecordBatchStreamWriter(sink, batch.schema) as writer:
            writer.write_batch(batch)

%matplotlib notebook
import benchit
benchit.setparams(environ='notebook')
benchit.setparams(rep=5)

arr = np.random.randint(65535, size=(int(1e9),), dtype=np.uint16)
size = [4, 8, 12, 20, 32, 48, 64, 100, 160, 256, 400, 600, 1000]

def pwa_Batch_10000(arr, name): return pyarrow_write_arrow_Batch(arr.reshape(-1, 10000), name)
def pwa_Batch_100000(arr, name): return pyarrow_write_arrow_Batch(arr.reshape(-1, 100000), name)
def pwa_Batch_1000000(arr, name): return pyarrow_write_arrow_Batch(arr.reshape(-1, 1000000), name)
def pwa_Batch_4000000(arr, name): return pyarrow_write_arrow_Batch(arr.reshape(-1, 4000000), name)

fns = [pwa_Batch_10000, pwa_Batch_100000, pwa_Batch_1000000, pwa_Batch_4000000]
in_ = {s: (arr[:s*int(1e6)], 'test.arrow') for s in size}
t = benchit.timings(fns, in_, multivar=True, input_name='Millions of items')
t.plot(logx=True, figsize=(8,4), fontsize=10)

enter image description here


网页内容由stack overflow 提供, 点击上面的
可以查看英文原文,
原文链接