以下是一些演示代码,展示通过调整算法可以实现什么。这是纯numpy代码,在你发布的样本数据上比原始版本快了大约35倍(在我的相对较低配置电脑上,处理大约1,000,000个样本只需要约2.5秒):
>>> result_dict = master('run')
[('load', 0.82578349113464355), ('precomp', 0.028138399124145508), ('max/min', 0.24333405494689941), ('ABCD', 0.015314102172851562), ('main', 1.3356468677520752)]
TOTAL 2.44821691513
使用的修改:
- A+B+C+D,详见我的其他答案
- 运行min/max,包括避免使用相同的Cmin/dif多次计算(A+B+C+D-4Cmin)/(4dif)。
这些修改更多地是例行操作。这就留下了与数据2a/b的比较,这需要O(NK)的昂贵成本,其中N是样本数量,K是窗口大小。
在这里,我们可以利用相对良好的数据。使用running min/max,可以创建data2a/b的变体,可以用于一次测试一系列窗口偏移量,如果测试失败,则可以立即排除所有这些偏移量,否则该范围被二分。
import numpy as np
import time
preA, preB = {}, {}
CHUNK_SIZES = None
def sliding_argmax(data, K=2000):
"""compute the argmax of data over a sliding window of width K
returns:
indices -- indices into data
switches -- window offsets at which the maximum changes
(strictly speaking: where the index of the maximum changes)
excludes 0 but includes maximum offset (len(data)-K+1)
see last line of compute_pre_screening_filter for a recipe to convert
this representation to the vector of maxima
"""
N = len(data)
last = np.argmax(data[:K])
indices = [last]
while indices[-1] <= N - 1:
ge = np.where(data[last + 1 : last + K + 1] > data[last])[0]
if len(ge) == 0:
if last + K >= N:
break
last += 1 + np.argmax(data[last + 1 : last + K + 1])
indices.append(last)
else:
last += 1 + ge[0]
indices.append(last)
indices = np.array(indices)
switches = np.where(data[indices[1:]] > data[indices[:-1]],
indices[1:] + (1-K), indices[:-1] + 1)
return indices, np.r_[switches, [len(data)-K+1]]
def compute_pre_screening_filter(bound, n_offs):
"""compute pre-screening filter for point-wise upper bound
given a K-vector of upper bounds B and K+n_offs-1-vector data
compute K+n_offs-1-vector filter such that for each index j
if for any offset 0 <= o < n_offs and index 0 <= i < K such that
o + i = j, the inequality B_i >= data_j holds then filter_j >= data_j
therefore the number of data points below filter is an upper bound for
the maximum number of points below bound in any K-window in data
"""
pad_l, pad_r = np.min(bound[:n_offs-1]), np.min(bound[1-n_offs:])
padded = np.r_[pad_l+np.zeros(n_offs-1,), bound, pad_r+np.zeros(n_offs-1,)]
indices, switches = sliding_argmax(padded, n_offs)
return padded[indices].repeat(np.diff(np.r_[[0], switches]))
def compute_all_pre_screening_filters(upper, lower, min_chnk=5, dyads=6):
"""compute upper and lower pre-screening filters for data blocks of
sizes K+n_offs-1 where
n_offs = min_chnk, 2min_chnk, ..., 2^(dyads-1)min_chnk
the result is stored in global variables preA and preB
"""
global CHUNK_SIZES
CHUNK_SIZES = min_chnk * 2**np.arange(dyads)
preA[1] = upper
preB[1] = lower
for n in CHUNK_SIZES:
preA[n] = compute_pre_screening_filter(upper, n)
preB[n] = -compute_pre_screening_filter(-lower, n)
def test_bounds(block, counts, threshold=400):
"""test whether the windows fitting in the data block 'block' fall
within the bounds using pre-screening for efficient bulk rejection
array 'counts' will be overwritten with the counts of compliant samples
note that accurate counts will only be returned for above threshold
windows, because the analysis of bulk rejected windows is short-circuited
also note that bulk rejection only works for 'well behaved' data and
for example not on random numbers
"""
N = len(counts)
K = len(preA[1])
r = N % CHUNK_SIZES[0]
counts[:r] = [np.count_nonzero((block[l:l+K] <= preA[1]) &
(block[l:l+K] >= preB[1]))
for l in range(r)]
def bisect(block, counts):
M = len(counts)
cnts = np.count_nonzero((block <= preA[M]) & (block >= preB[M]))
if cnts < threshold:
counts[:] = cnts
return
elif M == CHUNK_SIZES[0]:
counts[:] = [np.count_nonzero((block[l:l+K] <= preA[1]) &
(block[l:l+K] >= preB[1]))
for l in range(M)]
else:
M //= 2
bisect(block[:-M], counts[:M])
bisect(block[M:], counts[M:])
N = N // CHUNK_SIZES[0]
for M in CHUNK_SIZES:
if N % 2:
bisect(block[r:r+M+K-1], counts[r:r+M])
r += M
elif N == 0:
return
N //= 2
else:
for j in range(2*N):
bisect(block[r:r+M+K-1], counts[r:r+M])
r += M
def analyse(data, use_pre_screening=True, min_chnk=5, dyads=6,
threshold=400):
samples, upper, lower = data
N, K = samples.shape[0], upper.shape[0]
times = [time.time()]
if use_pre_screening:
compute_all_pre_screening_filters(upper, lower, min_chnk, dyads)
times.append(time.time())
upper_inds, upper_swp = sliding_argmax(samples[:, 1], K)
lower_inds, lower_swp = sliding_argmax(-samples[:, 2], K)
times.append(time.time())
ABCD = samples.sum(axis=-1)
times.append(time.time())
counts = np.empty((N-K+1,), dtype=int)
offs = 0
u_ind, u_scale, u_swp = 0, samples[upper_inds[0], 1], upper_swp[0]
l_ind, l_scale, l_swp = 0, samples[lower_inds[0], 2], lower_swp[0]
while True:
if u_swp > l_swp:
block = (ABCD[offs:l_swp+K-1] - 4*l_scale) \
* (0.25 / (u_scale-l_scale))
if use_pre_screening:
test_bounds(block, counts[offs:l_swp], threshold=threshold)
else:
counts[offs:l_swp] = [
np.count_nonzero((block[l:l+K] <= upper) &
(block[l:l+K] >= lower))
for l in range(l_swp - offs)]
l_ind += 1
offs = l_swp
l_swp = lower_swp[l_ind]
l_scale = samples[lower_inds[l_ind], 2]
else:
block = (ABCD[offs:u_swp+K-1] - 4*l_scale) \
* (0.25 / (u_scale-l_scale))
if use_pre_screening:
test_bounds(block, counts[offs:u_swp], threshold=threshold)
else:
counts[offs:u_swp] = [
np.count_nonzero((block[l:l+K] <= upper) &
(block[l:l+K] >= lower))
for l in range(u_swp - offs)]
u_ind += 1
if u_ind == len(upper_inds):
assert u_swp == N-K+1
break
offs = u_swp
u_swp = upper_swp[u_ind]
u_scale = samples[upper_inds[u_ind], 1]
times.append(time.time())
return {'counts': counts, 'valid': np.where(counts >= 400)[0],
'timings': np.diff(times)}
def master(mode='calibrate', data='fake', use_pre_screening=True, nrep=3,
min_chnk=None, dyads=None):
t = time.time()
if data in ('fake', 'load'):
data1 = np.loadtxt('data1.csv', delimiter=';', skiprows=1,
usecols=[1,2,3,4])
data2a = np.loadtxt('data2a.csv', delimiter=';', skiprows=1,
usecols=[1])
data2b = np.loadtxt('data2b.csv', delimiter=';', skiprows=1,
usecols=[1])
if data == 'fake':
data1 = np.tile(data1, (10, 1))
threshold = 400
elif data == 'random':
data1 = np.random.random((102000, 4))
data2b = np.random.random(2000)
data2a = np.random.random(2000)
threshold = 490
if use_pre_screening or mode == 'calibrate':
print('WARNING: pre-screening not efficient on artificial data')
else:
raise ValueError("data mode {} not recognised".format(data))
data = data1, data2a, data2b
t_load = time.time() - t
if mode == 'calibrate':
min_chnk = (2, 3, 4, 5, 6) if min_chnk is None else min_chnk
dyads = (0, 1, 2, 3, 4, 5, 6, 7, 8, 9, 10) if dyads is None else dyads
timings = np.zeros((len(min_chnk), len(dyads)))
print('max bisect ' + ' '.join([
' n.a.' if dy == 0 else '{:7d}'.format(dy) for dy in dyads]),
end='')
for i, mc in enumerate(min_chnk):
print('\nmin chunk {}'.format(mc), end=' ')
for j, dy in enumerate(dyads):
for k in range(nrep):
if dy == 0:
timings[i, j] += analyse(
data, False, mc, dy, threshold)['timings'][3]
else:
timings[i, j] += analyse(
data, True, mc, dy, threshold)['timings'][3]
timings[i, j] /= nrep
print('{:7.3f}'.format(timings[i, j]), end=' ', flush=True)
best_mc, best_dy = np.unravel_index(np.argmin(timings.ravel()),
timings.shape)
print('\nbest', min_chnk[best_mc], dyads[best_dy])
return timings, min_chnk[best_mc], dyads[best_dy]
if mode == 'run':
min_chnk = 2 if min_chnk is None else min_chnk
dyads = 5 if dyads is None else dyads
res = analyse(data, use_pre_screening, min_chnk, dyads, threshold)
times = np.r_[[t_load], res['timings']]
print(list(zip(('load', 'precomp', 'max/min', 'ABCD', 'main'), times)))
print('TOTAL', times.sum())
return res
resultArray
,然后在每次迭代中索引到它以进行更新,而不是从空列表开始并使用缓慢的append
。 - Divakar