如何高效地读取tsv文件中的数组列,并将每个列有效地存储为单独的npz文件?

3

我有一个像这样的数据文件:

58f0965a62d62099f5c0771d35dbc218        0.868632614612579       [0.028979932889342308, 0.004080114420503378, 0.03757167607545853]       [-0.006008833646774292, -0.010409083217382431, 0.01565541699528694]
36f7859ce47417470bc28384694f0ac4        0.835115909576416       [0.026130573824048042, -0.00358427781611681, 0.06635218113660812]       [-0.06970945745706558, 0.03816794604063034, 0.03491008281707764]
59f7d617bb662155b0d49ce3f27093ed        0.907200276851654       [0.009903069585561752, -0.009721670299768448, 0.0151780480518937]       [-0.03264783322811127, 0.0035394825972616673, -0.05089104175567627]

其中,这些列分别为:

  • 数据点的 MD5 哈希值
  • 目标浮点输出
  • 一个浮点数数组,我想将其读入 np.array 对象中
  • 另一个浮点数数组,我想将其读入 np.array 对象中

我一直按照以下方式读取文件,以创建两个浮点数数组的矩阵:

import numpy as np
from tqdm import tqdm

import pandas as pd

lol = []
with open('data.tsv') as fin:
    for line in tqdm(fin):
        md5hash, score, vector1, vector2 = line.strip().split('\t')
        row = {'md5_hash': md5hash, 'score':float(score), 
               'vector1': np.array(eval(vector1)), 
               'vector2': np.array(eval(vector2))
              }
        lol.append(row)
        
df = pd.DataFrame(lol)

training_vector1 = np.array(list(df['vector1']))
# Save the training vectors.
np.save('vector1.npz', training_vector1)

training_vector2 = np.array(list(df['vector2']))
# Save the training vectors.
np.save('vector1.npz', training_vector2)

虽然这种方法可以适用于小数据集,但实际数据集中的数组中包含更多的浮点数,并且接近2亿行。以下是100行样本:https://gist.github.com/1f6f0b2501dc334db1e0038d36452f5d

如何高效地将tsv文件中的数组列读入单个npz文件的每列并提高效率?


如果我执行 cut -f3 data.tsv 命令,那么我能否使用任何numpy或pandas读取函数轻松地读取该文件? - alvas
1
你还在寻找解决方案吗?(除了你下面发布的那个) - SultanOrazbayev
1
请注意,数据规模为2亿行。如果有比下面的解决方案更有效的方法,请告诉我。 - alvas
你是内存不足还是性能问题?你知道行数的确切数量吗?由于npz文件只是一个zip文件,所以顺序读取->写入很容易实现,但这并不能提高性能。 - max9111
我事先没有确切的行数,但如果需要的话,我可以在此之前运行计数。 - alvas
4个回答

3
首先,请注意整体问题。 任何类似于您提供的示例输入的200M行的加载方法都需要大约1.1 TB的内存。 虽然这是可能的,但显然并不理想。 因此,我不建议继续进行此操作,而应寻找专门设计用于处理大型数据集的方法,例如HDF5
话虽如此,手头的问题并不特别复杂,但通过pandas和eval()传递可能既不可取也没有好处。
同样可以说对于只是稍微容易阅读一些的CSV文件的'cut'预处理也是如此。
假设np.save()在数组如何生成的情况下速度相同,我们可以说以下函数很好地复制了OP中的处理:
def process_tsv_OP(filepath="100-translation.embedded-3.tsv"):  
    lol = []
    with open(filepath, "r") as fin:
        for line in fin:
            md5hash, score, vector1, vector2 = line.strip().split('\t')
            row = {'md5_hash': md5hash, 'score':float(score), 
                'vector1': np.array(eval(vector1)), 
                'vector2': np.array(eval(vector2))
                }
            lol.append(row)
    df = pd.DataFrame(lol)
    training_vector1 = np.array(list(df['vector1']))
    training_vector2 = np.array(list(df['vector2']))
    return training_vector1, training_vector2

这可以通过避免使用 pandas 和 "邪恶的-eval()"(以及在内存中进行大量复制)来简化:
def text2row(text):
    text = text[1:-1]
    return [float(x) for x in text.split(',')]


def process_tsv(filepath="100-translation.embedded-3.tsv"):
    with open(filepath, "r") as in_file:
        v1 = []
        v2 = []
        for line in in_file:
            _, _, text_r1, text_r2 = line.strip().split('\t')
            r1 = text2row(text_r1)
            r2 = text2row(text_r2)
            v1.append(r1)
            v2.append(r2)
    v1 = np.array(v1)
    v2 = np.array(v2)
    return v1, v2

很容易证明这两个产生相同的输出:

def same_res(x, y):
    return all(np.allclose(i, j) for i, j in zip(x, y))


same_res(process_tsv(), process_tsv_OP())
# True

但时间安排大不相同:
%timeit process_tsv_OP()
# 1 loop, best of 5: 300 ms per loop
%timeit process_tsv()
# 10 loops, best of 5: 86.1 ms per loop

(在使用代码wget https://gist.githubusercontent.com/alvations/1f6f0b2501dc334db1e0038d36452f5d/raw/ee31c052a4dbda131df182f0237dbe6e5197dff2/100-translation.embedded-3.tsv获取的样本输入文件上)

使用cut对输入进行预处理似乎并不是很有益:

!time cut -f3 100-translation.embedded-3.tsv | rev | cut -c2- | rev | cut -c2- > vector1.csv
# real  0m0.184s
# user  0m0.102s
# sys   0m0.233s
!time cut -f4 100-translation.embedded-3.tsv | rev | cut -c2- | rev | cut -c2- > vector2.csv
# real  0m0.208s
# user  0m0.113s
# sys   0m0.279s
%timeit np.genfromtxt('vector1.csv', delimiter=','); np.genfromtxt('vector2.csv', delimiter=',')
# 1 loop, best of 5: 130 ms per loop

虽然使用pd.read_csv()可以节省一些时间:

%timeit pd.read_csv('vector1.csv').to_numpy(); pd.read_csv('vector2.csv').to_numpy()
# 10 loops, best of 5: 85.7 ms per loop

这似乎比提供的数据集上的原始方法更慢(尽管 cut 本身可能更适合处理更大的输入)。
如果您真的想坚持使用npy文件格式,那么您至少希望分块输出。虽然这在NumPy中不受支持,但您可以使用NpyAppendArray(也请参见此处)。修改后的process_tsv()将如下所示:
import os
from npy_append_array import NpyAppendArray


def process_tsv_append(
    in_filepath="100-translation.embedded-3.tsv",
    out1_filepath="out1.npy",
    out2_filepath="out2.npy",
    append_every=10,
):
    # clear output files
    for filepath in (out1_filepath, out2_filepath):
        if os.path.isfile(filepath):
            os.remove(filepath)
    with \
            open(in_filepath, "r") as in_file, \
            NpyAppendArray(out1_filepath) as npaa1, \
            NpyAppendArray(out2_filepath) as npaa2:
        v1 = []
        v2 = []
        for i, line in enumerate(in_file, 1):
            _, _, text_r1, text_r2 = line.strip().split("\t")
            r1 = text2row(text_r1)
            r2 = text2row(text_r2)
            v1.append(r1)
            v2.append(r2)
            if i % append_every == 0:
                npaa1.append(np.array(v1))
                npaa2.append(np.array(v2))
                v1 = []
                v2 = []
        if len(v1) > 0:  # assumes len(v1) == len(v2)
            npaa1.append(np.array(v1))
            npaa2.append(np.array(v2))


process_tsv_append()

v1 = np.load("out1.npy")
v2 = np.load("out2.npy")
same_res(process_tsv(), (v1, v2))
# True

使用Cython可以相对盲目地加速所有这些内容,但速度提升似乎很小:

%%cython -c-O3 -c-march=native -a
#cython: language_level=3, boundscheck=False, wraparound=False, initializedcheck=False, cdivision=True, infer_types=True


import numpy as np


cpdef text2row_cy(text):
    return [float(x) for x in text[1:-1].split(',')]


cpdef process_tsv_cy(filepath="100-translation.embedded-3.tsv"):
    with open(filepath, "r") as in_file:
        v1 = []
        v2 = []
        for line in in_file:
            _, _, text_r1, text_r2 = line.strip().split('\t')
            r1 = text2row_cy(text_r1)
            r2 = text2row_cy(text_r2)
            v1.append(r1)
            v2.append(r2)
    v1 = np.array(v1)
    v2 = np.array(v2)
    return v1, v2

print(same_res(process_tsv_cy(), process_tsv_OP()))
# True
%timeit process_tsv_cy()
# 10 loops, best of 5: 72.4 ms per loop

同样地,预先分配数组似乎并没有什么好处:
def text2row_out(text, out):
    for i, x in enumerate(text[1:-1].split(',')):
        out[i] = float(x)


def process_tsv_alloc(filepath="100-translation.embedded-3.tsv"):
    num_lines = open(filepath, "r").read().count("\n")
    with open(filepath, "r") as in_file:
        # num lines
        num_lines = in_file.read().count("\n")
        # num cols
        in_file.seek(0)
        line = next(in_file)
        _, _, text_r1, text_r2 = line.strip().split('\t')
        num_cols1 = len(text_r1.split(","))
        num_cols2 = len(text_r2.split(","))
        # populate arrays
        v1 = np.empty((num_lines, num_cols1))
        v2 = np.empty((num_lines, num_cols2))
        in_file.seek(0)
        for i, line in enumerate(in_file):
            _, _, text_r1, text_r2 = line.strip().split('\t')
            text2row_out(text_r1, v1[i])
            text2row_out(text_r2, v2[i])
    return v1, v2


print(same_res(process_tsv_alloc(), process_tsv_OP()))
%timeit process_tsv_alloc()
# 10 loops, best of 5: 110 ms per loop

使用Numba(以及可能的Cython)将所有内容重写为更接近于C,可以显著减少运行时间。为了使我们的代码与Numba兼容,并从其加速中受益,我们需要进行重大修改:
  • 以字节形式打开文件(不再支持UTF-8,这对手头的问题不是很重要)
  • 分块读取和处理文件,块大小应足够大,例如1M的数量级
  • 手动编写所有字符串处理函数,特别是字符串转浮点数的转换
import numpy as np
import numba as nb


@nb.njit
def bytes2int(text):
    c_min = ord("0")
    c_max = ord("9")

    n = len(text)
    valid = n > 0
    # determine sign
    start = n - 1
    stop = -1
    sign = 1
    if valid:
        first = text[0]
        if first == ord("+"):
            stop = 0
        elif first == ord("-"):
            sign = -1
            stop = 0
    # parse rest
    number = 0
    j = 0
    for i in range(start, stop, -1):
        c = text[i]
        if c_min <= c <= c_max:
            number += (c - c_min) * 10 ** j
            j += 1
        else:
            valid = False
            break
    return sign * number if valid else None


@nb.njit
def bytes2float_helper(text):
    sep = ord(".")
    c_min = ord("0")
    c_max = ord("9")

    n = len(text)
    valid = n > 0
    # determine sign
    start = n - 1
    stop = -1
    sign = 1
    if valid:
        first = text[0]
        if first == ord("+"):
            stop = 0
        elif first == ord("-"):
            sign = -1
            stop = 0
    # parse rest
    sep_pos = 0
    number = 0
    j = 0
    for i in range(start, stop, -1):
        c = text[i]
        if c_min <= c <= c_max:
            number += (c - c_min) * 10 ** j
            j += 1
        elif c == sep and sep_pos == 0:
            sep_pos = j
        else:
            valid = False
            break
    return sign * number, sep_pos, valid


@nb.njit
def bytes2float(text):
    exp_chars = b"eE"
    exp_pos = -1
    for exp_char in exp_chars:
        for i, c in enumerate(text[::-1]):
            if c == exp_char:
                exp_pos = i
                break
        if exp_pos > -1:
            break
    if exp_pos > 0:
        exp_number = bytes2int(text[-exp_pos:])
        if exp_number is None:
            exp_number = 0
        number, sep_pos, valid = bytes2float_helper(text[:-exp_pos-1])
        result = number / 10.0 ** (sep_pos - exp_number) if valid else None
    else:
        number, sep_pos, valid = bytes2float_helper(text)
        result = number / 10.0 ** sep_pos if valid else None
    return result


@nb.njit
def btrim(text):
    space = ord(" ")
    tab = ord("\t")
    nl = ord("\n")
    cr = ord("\r")
    start = 0
    stop = 0
    for c in text:
        if c == space or c == tab or c == nl or c == cr:
            start += 1
        else:
            break
    for c in text[::-1]:
        if c == space:
            stop += 1
        else:
            break
    if start == 0 and stop == 0:
        return text
    elif stop == 0:
        return text[start:]
    else:
        return text[start:-stop]


@nb.njit
def text2row_nb(text, sep, num_cols, out, curr_row):
    last_i = 0
    j = 0
    for i, c in enumerate(text):
        if c == sep:
            x = bytes2float(btrim(text[last_i:i]))
            out[curr_row, j] = x
            last_i = i + 2
            j += 1
    x = bytes2float(btrim(text[last_i:]))
    out[curr_row, j] = x


@nb.njit
def process_line(line, psep, sep, num_psep, num_cols1, num_cols2, out1, out2, curr_row):
    if len(line) > 0:
        psep_pos = np.empty(num_psep, dtype=np.int_)
        j = 0
        for i, char in enumerate(line):
            if char == psep:
                psep_pos[j] = i
                j += 1
        text2row_nb(line[psep_pos[-2] + 2:psep_pos[-1] - 1], sep, num_cols1, out1, curr_row)
        text2row_nb(line[psep_pos[-1] + 2:-1], sep, num_cols2, out2, curr_row)


@nb.njit
def decode_block(block, psep, sep, num_lines, num_cols1, num_cols2, out1, out2, curr_row):
    nl = ord("\n")
    last_i = 0
    i = j = 0
    for c in block:
        if c == nl:
            process_line(block[last_i:i], psep, sep, 3, num_cols1, num_cols2, out1, out2, curr_row)
            j += 1
            last_i = i
            curr_row += 1
        if j >= num_lines:
            break
        i += 1
    return block[i + 1:], curr_row


@nb.njit
def count_nl(block, start=0):
    nl = ord("\n")
    for c in block:
        if c == nl:
            start += 1
    return start


def process_tsv_block(filepath="100-translation.embedded-3.tsv", size=2 ** 18):
    with open(filepath, "rb") as in_file:
        # count newlines
        num_lines = 0
        while True:
            block = in_file.read(size)
            if block:
                num_lines = count_nl(block, num_lines)
            else:
                break

        # count num columns
        in_file.seek(0)
        line = next(in_file)
        _, _, text_r1, text_r2 = line.strip().split(b'\t')
        num_cols1 = len(text_r1.split(b","))
        num_cols2 = len(text_r2.split(b","))
        
        # fill output arrays
        v1 = np.empty((num_lines, num_cols1))
        v2 = np.empty((num_lines, num_cols2))
        in_file.seek(0)
        remainder = b""
        curr_row = 0
        while True:
            block = in_file.read(size)
            if block:
                block = remainder + block
                num_lines = count_nl(block)
                if num_lines > 0:
                    remainder, curr_row = decode_block(block, ord("\t"), ord(","), num_lines, num_cols1, num_cols2, v1, v2, curr_row)
                else:
                    remainder = block
            else:
                num_lines = count_nl(remainder)
                if num_lines > 0:
                    remainder, curr_row = decode_block(remainder, ord("\t"), ord(","), num_lines, num_cols1, num_cols2, v1, v2, curr_row)
                break
    return v1, v2

所有这些工作的奖励仅仅是比process_tsv()快大约两倍的速度:
print(same_res(process_tsv_block(), process_tsv_OP()))
# True
%timeit process_tsv_block()
# 10 loops, best of 5: 48.8 ms per loop

2

剪切第三列,去掉第一个和最后一个方括号

cut -f3 data.tsv | rev | cut -c2- | rev | cut -c2- > vector1.csv

重复对向量2执行相同操作

cut -f4 data.tsv | rev | cut -c2- | rev | cut -c2- > vector2.csv

使用Python将csv文件读入numpy,保存为npy文件。

import numpy as np

np.save('vector1.npy', np.genfromtxt('vector1.csv', delimiter=','))
np.save('vector1.npy', np.genfromtxt('vector2.csv', delimiter=','))

2
其他答案很好,下面的版本是使用 dask 的变体。由于原始数据以文本格式存在,让我们使用 dask.bag API。
首先,导入模块并定义一个实用函数:
from dask.array import from_delayed, from_npy_stack, to_npy_stack, vstack
from dask.bag import read_text
from numpy import array, nan, stack

def process_line(line):
    """Utility function adapted from the snippet in the question."""
    md5hash, score, vector1, vector2 = line.strip().split("\t")
    row = {
        "md5_hash": md5hash,
        "score": float(score),
        "vector1": array(eval(vector1)),
        "vector2": array(eval(vector2)),
    }
    return row

接下来,创建一个bag
bag = read_text("100-translation.embedded-3.tsv", blocksize="1mb").map(process_line)

由于示例代码较小,为了模拟“大数据”,让我们假设我们可以一次加载“1mb”。这应该在包中创建3个分区。

接下来,将向量/数组隔离并转换为 dask.arrays

# create delayed versions of the arrays
a1 = bag.pluck("vector1").map_partitions(stack).to_delayed()
a2 = bag.pluck("vector2").map_partitions(stack).to_delayed()

# convert the delayed objects to dask array
A1 = vstack(
    [from_delayed(a, shape=(nan, 768), dtype="float") for a in a1],
    allow_unknown_chunksizes=True,
)
A2 = vstack(
    [from_delayed(a, shape=(nan, 768), dtype="float") for a in a2],
    allow_unknown_chunksizes=True,
)

现在,我们可以将数组保存为npy堆栈:
to_npy_stack("_A1", A1)
to_npy_stack("_A2", A2)

请注意,这种处理方式并不理想,因为工作进程将会两次遍历数据(每个数组一次),但是在当前的API中,我想不到更好的方法。
此外,请注意,npy堆栈保留了“未知”块作为元数据,尽管所有相关信息都已计算。这是可以在dask代码库中改进的事情,但目前最简单的解决方法是重新加载数据,计算块,重新分块(以获得漂亮的网格结构)并再次保存:
# rechunk into regular-sized format
A1 = from_npy_stack("_A1")
A1.compute_chunk_sizes()
A1.rechunk(chunks=(40, 768))
to_npy_stack("A1_final", A1)

# rechunk into regular-sized format
A2 = from_npy_stack("_A2")
A2.compute_chunk_sizes()
A2.rechunk(chunks=(40, 768))
to_npy_stack("A2_final", A2)

当然在真实数据集上,你需要使用更大的块。最后保存操作不一定要保存为 numpy 堆栈,取决于你的兴趣,现在可以将其存储为 HDF5zarr 数组。


1
如果将输出格式更改为原始二进制文件,则可以逐行处理输入文件,无需在 RAM 中存储完整结果。
import numpy as np

fh_in = open('data.tsv')
fh_vec1 = open('vector1.bin', 'wb')
fh_vec2 = open('vector2.bin', 'wb')

linecount = 0
for line in fh_in:
    hash_, score, vec1, vec2 = line.strip().split('\t')
    np.fromstring(vec1.strip('[]'), sep=',').tofile(fh_vec1)
    np.fromstring(vec2.strip('[]'), sep=',').tofile(fh_vec2)
    linecount += 1

原始二进制文件不存储任何有关dtype、shape或字节顺序的信息。要将其加载回数组,您可以使用np.fromfilenp.memmap,然后在其上调用.reshape(linecount, -1)


1
这可能需要关闭文件,或者重写使用上下文管理器(即 with ...)。风险是一些未完成的刷新可能会破坏 vector1.binvector2.bin 文件的结尾。 - norok2

网页内容由stack overflow 提供, 点击上面的
可以查看英文原文,
原文链接