首先,请注意整体问题。
任何类似于您提供的示例输入的200M行的加载方法都需要大约1.1 TB的内存。
虽然这是可能的,但显然并不理想。
因此,我不建议继续进行此操作,而应寻找专门设计用于处理大型数据集的方法,例如
HDF5。
话虽如此,手头的问题并不特别复杂,但通过pandas和eval()传递可能既不可取也没有好处。
同样可以说对于只是稍微容易阅读一些的CSV文件的'cut'预处理也是如此。
假设np.save()在数组如何生成的情况下速度相同,我们可以说以下函数很好地复制了OP中的处理:
def process_tsv_OP(filepath="100-translation.embedded-3.tsv"):
lol = []
with open(filepath, "r") as fin:
for line in fin:
md5hash, score, vector1, vector2 = line.strip().split('\t')
row = {'md5_hash': md5hash, 'score':float(score),
'vector1': np.array(eval(vector1)),
'vector2': np.array(eval(vector2))
}
lol.append(row)
df = pd.DataFrame(lol)
training_vector1 = np.array(list(df['vector1']))
training_vector2 = np.array(list(df['vector2']))
return training_vector1, training_vector2
这可以通过避免使用
pandas
和 "
邪恶的-
eval()
"(以及在内存中进行大量复制)来简化:
def text2row(text):
text = text[1:-1]
return [float(x) for x in text.split(',')]
def process_tsv(filepath="100-translation.embedded-3.tsv"):
with open(filepath, "r") as in_file:
v1 = []
v2 = []
for line in in_file:
_, _, text_r1, text_r2 = line.strip().split('\t')
r1 = text2row(text_r1)
r2 = text2row(text_r2)
v1.append(r1)
v2.append(r2)
v1 = np.array(v1)
v2 = np.array(v2)
return v1, v2
很容易证明这两个产生相同的输出:
def same_res(x, y):
return all(np.allclose(i, j) for i, j in zip(x, y))
same_res(process_tsv(), process_tsv_OP())
但时间安排大不相同:
%timeit process_tsv_OP()
%timeit process_tsv()
(在使用代码
wget https://gist.githubusercontent.com/alvations/1f6f0b2501dc334db1e0038d36452f5d/raw/ee31c052a4dbda131df182f0237dbe6e5197dff2/100-translation.embedded-3.tsv
获取的样本输入文件上)
使用cut
对输入进行预处理似乎并不是很有益:
!time cut -f3 100-translation.embedded-3.tsv | rev | cut -c2- | rev | cut -c2- > vector1.csv
!time cut -f4 100-translation.embedded-3.tsv | rev | cut -c2- | rev | cut -c2- > vector2.csv
%timeit np.genfromtxt('vector1.csv', delimiter=','); np.genfromtxt('vector2.csv', delimiter=',')
虽然使用pd.read_csv()
可以节省一些时间:
%timeit pd.read_csv('vector1.csv').to_numpy(); pd.read_csv('vector2.csv').to_numpy()
# 10 loops, best of 5: 85.7 ms per loop
这似乎比提供的数据集上的原始方法更慢(尽管
cut
本身可能更适合处理更大的输入)。
如果您真的想坚持使用
npy
文件格式,那么您至少希望分块输出。虽然这在NumPy中不受支持,但您可以使用
NpyAppendArray
(也请参见
此处)。修改后的
process_tsv()
将如下所示:
import os
from npy_append_array import NpyAppendArray
def process_tsv_append(
in_filepath="100-translation.embedded-3.tsv",
out1_filepath="out1.npy",
out2_filepath="out2.npy",
append_every=10,
):
for filepath in (out1_filepath, out2_filepath):
if os.path.isfile(filepath):
os.remove(filepath)
with \
open(in_filepath, "r") as in_file, \
NpyAppendArray(out1_filepath) as npaa1, \
NpyAppendArray(out2_filepath) as npaa2:
v1 = []
v2 = []
for i, line in enumerate(in_file, 1):
_, _, text_r1, text_r2 = line.strip().split("\t")
r1 = text2row(text_r1)
r2 = text2row(text_r2)
v1.append(r1)
v2.append(r2)
if i % append_every == 0:
npaa1.append(np.array(v1))
npaa2.append(np.array(v2))
v1 = []
v2 = []
if len(v1) > 0:
npaa1.append(np.array(v1))
npaa2.append(np.array(v2))
process_tsv_append()
v1 = np.load("out1.npy")
v2 = np.load("out2.npy")
same_res(process_tsv(), (v1, v2))
使用Cython可以相对盲目地加速所有这些内容,但速度提升似乎很小:
%%cython -c-O3 -c-march=native -a
import numpy as np
cpdef text2row_cy(text):
return [float(x) for x in text[1:-1].split(',')]
cpdef process_tsv_cy(filepath="100-translation.embedded-3.tsv"):
with open(filepath, "r") as in_file:
v1 = []
v2 = []
for line in in_file:
_, _, text_r1, text_r2 = line.strip().split('\t')
r1 = text2row_cy(text_r1)
r2 = text2row_cy(text_r2)
v1.append(r1)
v2.append(r2)
v1 = np.array(v1)
v2 = np.array(v2)
return v1, v2
print(same_res(process_tsv_cy(), process_tsv_OP()))
%timeit process_tsv_cy()
同样地,预先分配数组似乎并没有什么好处:
def text2row_out(text, out):
for i, x in enumerate(text[1:-1].split(',')):
out[i] = float(x)
def process_tsv_alloc(filepath="100-translation.embedded-3.tsv"):
num_lines = open(filepath, "r").read().count("\n")
with open(filepath, "r") as in_file:
num_lines = in_file.read().count("\n")
in_file.seek(0)
line = next(in_file)
_, _, text_r1, text_r2 = line.strip().split('\t')
num_cols1 = len(text_r1.split(","))
num_cols2 = len(text_r2.split(","))
v1 = np.empty((num_lines, num_cols1))
v2 = np.empty((num_lines, num_cols2))
in_file.seek(0)
for i, line in enumerate(in_file):
_, _, text_r1, text_r2 = line.strip().split('\t')
text2row_out(text_r1, v1[i])
text2row_out(text_r2, v2[i])
return v1, v2
print(same_res(process_tsv_alloc(), process_tsv_OP()))
%timeit process_tsv_alloc()
使用Numba(以及可能的Cython)将所有内容重写为更接近于C,可以显著减少运行时间。为了使我们的代码与Numba兼容,并从其加速中受益,我们需要进行重大修改:
- 以字节形式打开文件(不再支持UTF-8,这对手头的问题不是很重要)
- 分块读取和处理文件,块大小应足够大,例如1M的数量级
- 手动编写所有字符串处理函数,特别是字符串转浮点数的转换
import numpy as np
import numba as nb
@nb.njit
def bytes2int(text):
c_min = ord("0")
c_max = ord("9")
n = len(text)
valid = n > 0
start = n - 1
stop = -1
sign = 1
if valid:
first = text[0]
if first == ord("+"):
stop = 0
elif first == ord("-"):
sign = -1
stop = 0
number = 0
j = 0
for i in range(start, stop, -1):
c = text[i]
if c_min <= c <= c_max:
number += (c - c_min) * 10 ** j
j += 1
else:
valid = False
break
return sign * number if valid else None
@nb.njit
def bytes2float_helper(text):
sep = ord(".")
c_min = ord("0")
c_max = ord("9")
n = len(text)
valid = n > 0
start = n - 1
stop = -1
sign = 1
if valid:
first = text[0]
if first == ord("+"):
stop = 0
elif first == ord("-"):
sign = -1
stop = 0
sep_pos = 0
number = 0
j = 0
for i in range(start, stop, -1):
c = text[i]
if c_min <= c <= c_max:
number += (c - c_min) * 10 ** j
j += 1
elif c == sep and sep_pos == 0:
sep_pos = j
else:
valid = False
break
return sign * number, sep_pos, valid
@nb.njit
def bytes2float(text):
exp_chars = b"eE"
exp_pos = -1
for exp_char in exp_chars:
for i, c in enumerate(text[::-1]):
if c == exp_char:
exp_pos = i
break
if exp_pos > -1:
break
if exp_pos > 0:
exp_number = bytes2int(text[-exp_pos:])
if exp_number is None:
exp_number = 0
number, sep_pos, valid = bytes2float_helper(text[:-exp_pos-1])
result = number / 10.0 ** (sep_pos - exp_number) if valid else None
else:
number, sep_pos, valid = bytes2float_helper(text)
result = number / 10.0 ** sep_pos if valid else None
return result
@nb.njit
def btrim(text):
space = ord(" ")
tab = ord("\t")
nl = ord("\n")
cr = ord("\r")
start = 0
stop = 0
for c in text:
if c == space or c == tab or c == nl or c == cr:
start += 1
else:
break
for c in text[::-1]:
if c == space:
stop += 1
else:
break
if start == 0 and stop == 0:
return text
elif stop == 0:
return text[start:]
else:
return text[start:-stop]
@nb.njit
def text2row_nb(text, sep, num_cols, out, curr_row):
last_i = 0
j = 0
for i, c in enumerate(text):
if c == sep:
x = bytes2float(btrim(text[last_i:i]))
out[curr_row, j] = x
last_i = i + 2
j += 1
x = bytes2float(btrim(text[last_i:]))
out[curr_row, j] = x
@nb.njit
def process_line(line, psep, sep, num_psep, num_cols1, num_cols2, out1, out2, curr_row):
if len(line) > 0:
psep_pos = np.empty(num_psep, dtype=np.int_)
j = 0
for i, char in enumerate(line):
if char == psep:
psep_pos[j] = i
j += 1
text2row_nb(line[psep_pos[-2] + 2:psep_pos[-1] - 1], sep, num_cols1, out1, curr_row)
text2row_nb(line[psep_pos[-1] + 2:-1], sep, num_cols2, out2, curr_row)
@nb.njit
def decode_block(block, psep, sep, num_lines, num_cols1, num_cols2, out1, out2, curr_row):
nl = ord("\n")
last_i = 0
i = j = 0
for c in block:
if c == nl:
process_line(block[last_i:i], psep, sep, 3, num_cols1, num_cols2, out1, out2, curr_row)
j += 1
last_i = i
curr_row += 1
if j >= num_lines:
break
i += 1
return block[i + 1:], curr_row
@nb.njit
def count_nl(block, start=0):
nl = ord("\n")
for c in block:
if c == nl:
start += 1
return start
def process_tsv_block(filepath="100-translation.embedded-3.tsv", size=2 ** 18):
with open(filepath, "rb") as in_file:
num_lines = 0
while True:
block = in_file.read(size)
if block:
num_lines = count_nl(block, num_lines)
else:
break
in_file.seek(0)
line = next(in_file)
_, _, text_r1, text_r2 = line.strip().split(b'\t')
num_cols1 = len(text_r1.split(b","))
num_cols2 = len(text_r2.split(b","))
v1 = np.empty((num_lines, num_cols1))
v2 = np.empty((num_lines, num_cols2))
in_file.seek(0)
remainder = b""
curr_row = 0
while True:
block = in_file.read(size)
if block:
block = remainder + block
num_lines = count_nl(block)
if num_lines > 0:
remainder, curr_row = decode_block(block, ord("\t"), ord(","), num_lines, num_cols1, num_cols2, v1, v2, curr_row)
else:
remainder = block
else:
num_lines = count_nl(remainder)
if num_lines > 0:
remainder, curr_row = decode_block(remainder, ord("\t"), ord(","), num_lines, num_cols1, num_cols2, v1, v2, curr_row)
break
return v1, v2
所有这些工作的奖励仅仅是比
process_tsv()
快大约两倍的速度:
print(same_res(process_tsv_block(), process_tsv_OP()))
%timeit process_tsv_block()
cut -f3 data.tsv
命令,那么我能否使用任何numpy或pandas读取函数轻松地读取该文件? - alvas