如何加速嵌套循环?

10
我正在使用Python进行嵌套循环,如下所示。这是一种搜索现有金融时间序列并查找符合某些特征的时间段的基本方法。
在这种情况下,有两个单独但大小相等的数组,分别表示“收盘价”(即资产价格)和“成交量”(即在该期间内交易的资产数量)。对于每个时间段,我想向前查看所有长度在1到 INTERVAL_LENGTH 之间的未来间隔,并查看是否有任何这些间隔具有符合我的搜索条件的特征(在这种情况下,收盘价比率大于1.0001且小于1.5,总成交量大于100)。
我理解使用NumPy时加速的主要原因之一是解释器在评估任何内容时不需要每次检查操作数类型,只要您将其作为整体操作于数组中(例如 numpy_array * 2 ),但显然下面的代码没有利用这一点。
有没有一种方式可以用某种窗口函数替换内部循环,从而导致加速,或者使用 numpy / scipy 的其他方式在本机Python中实现大幅加速?
另外,是否有更好的方法来完成此操作(例如,使用C ++编写此循环并使用weave会更快吗)?
ARRAY_LENGTH = 500000
INTERVAL_LENGTH = 15
close = np.array( xrange(ARRAY_LENGTH) )
volume = np.array( xrange(ARRAY_LENGTH) )
close, volume = close.astype('float64'), volume.astype('float64')

results = []
for i in xrange(len(close) - INTERVAL_LENGTH):
    for j in xrange(i+1, i+INTERVAL_LENGTH):
        ret = close[j] / close[i]
        vol = sum( volume[i+1:j+1] )
        if ret > 1.0001 and ret < 1.5 and vol > 100:
            results.append( [i, j, ret, vol] )
print results

1
你的计算看起来非常简单,为什么不使用Cython? - Tarantula
@Tarantula 在Cython中,解决方案会是什么样子? - miguelmorin
3个回答

7
更新:下面的“new_function2”几乎完全矢量化版本...
稍后我会添加注释来解释其中的内容。
它可以提供约50倍的加速,如果您接受输出为numpy数组而不是列表,则可能会获得更大的加速。目前如此:
In [86]: %timeit new_function2(close, volume, INTERVAL_LENGTH)
1 loops, best of 3: 1.15 s per loop

你可以使用np.cumsum()函数替换内部循环...请参见下面的"new_function"函数。这将大大加快速度...
In [61]: %timeit new_function(close, volume, INTERVAL_LENGTH)
1 loops, best of 3: 15.7 s per loop

对比

In [62]: %timeit old_function(close, volume, INTERVAL_LENGTH)
1 loops, best of 3: 53.1 s per loop

完全可以将整个过程向量化,避免使用for循环。请给我一分钟时间,我来看看能做些什么...

import numpy as np

ARRAY_LENGTH = 500000
INTERVAL_LENGTH = 15
close = np.arange(ARRAY_LENGTH, dtype=np.float)
volume = np.arange(ARRAY_LENGTH, dtype=np.float)

def old_function(close, volume, INTERVAL_LENGTH):
    results = []
    for i in xrange(len(close) - INTERVAL_LENGTH):
        for j in xrange(i+1, i+INTERVAL_LENGTH):
            ret = close[j] / close[i]
            vol = sum( volume[i+1:j+1] )
            if (ret > 1.0001) and (ret < 1.5) and (vol > 100):
                results.append( (i, j, ret, vol) )
    return results


def new_function(close, volume, INTERVAL_LENGTH):
    results = []
    for i in xrange(close.size - INTERVAL_LENGTH):
        vol = volume[i+1:i+INTERVAL_LENGTH].cumsum()
        ret = close[i+1:i+INTERVAL_LENGTH] / close[i]

        filter = (ret > 1.0001) & (ret < 1.5) & (vol > 100)
        j = np.arange(i+1, i+INTERVAL_LENGTH)[filter]

        tmp_results = zip(j.size * [i], j, ret[filter], vol[filter])
        results.extend(tmp_results)
    return results

def new_function2(close, volume, INTERVAL_LENGTH):
    vol, ret = [], []
    I, J = [], []
    for k in xrange(1, INTERVAL_LENGTH):
        start = k
        end = volume.size - INTERVAL_LENGTH + k
        vol.append(volume[start:end])
        ret.append(close[start:end])
        J.append(np.arange(start, end))
        I.append(np.arange(volume.size - INTERVAL_LENGTH))

    vol = np.vstack(vol)
    ret = np.vstack(ret)
    J = np.vstack(J)
    I = np.vstack(I)

    vol = vol.cumsum(axis=0)
    ret = ret / close[:-INTERVAL_LENGTH]

    filter = (ret > 1.0001) & (ret < 1.5) & (vol > 100)

    vol = vol[filter]
    ret = ret[filter]
    I = I[filter]
    J = J[filter]

    output = zip(I.flat,J.flat,ret.flat,vol.flat)
    return output

results = old_function(close, volume, INTERVAL_LENGTH)
results2 = new_function(close, volume, INTERVAL_LENGTH)
results3 = new_function(close, volume, INTERVAL_LENGTH)

# Using sets to compare, as the output 
# is in a different order than the original function
print set(results) == set(results2)
print set(results) == set(results3)

3
一种加速方法是删除sum部分,因为在这个实现中,它通过INTERVAL_LENGTH对长度为2的列表进行求和。相反,只需将volume[j+1]添加到循环的上一次迭代中上一个vol的结果中。因此,每次只需添加两个整数,而不是每次求和并切片整个列表。此外,不要从sum(volume[i+1:j+1])开始,只需执行vol = volume[i+1] + volume[j+1],因为您知道这里的初始情况总是只有两个索引。

另一种加速方法是使用.extend而不是.append,因为Python实现中extend运行速度明显更快。

您还可以拆分最终的if语句,以便仅在必要时执行某些计算。例如,您知道if vol <= 100,则不需要计算ret

这并没有完全回答您的问题,但我认为特别是对于求和问题,这些更改应该会带来显着的加速。

编辑-你也不需要len,因为你已经知道列表的长度(除非那只是为了例子)。将其定义为数字而不是len(something)总是更快的。

编辑-实现(未经测试):

ARRAY_LENGTH = 500000
INTERVAL_LENGTH = 15
close = np.array( xrange(ARRAY_LENGTH) )
volume = np.array( xrange(ARRAY_LENGTH) )
close, volume = close.astype('float64'), volume.astype('float64')

results = []
ex = results.extend
for i in xrange(ARRAY_LENGTH - INTERVAL_LENGTH):
    vol = volume[i+1]
    for j in xrange(i+1, i+INTERVAL_LENGTH):
        vol += volume[j+1]
        if vol > 100:
            ret = close[j] / close[i]
            if 1.0001 < ret < 1.5:
                ex( [i, j, ret, vol] )
print results

另一个加速的方法是在循环之前定义extend_results=results.extend",然后在循环内使用extend_results([i, j, ret, vol])以避免查找。但是,在优化时始终要使用timeit`模块进行测量! - ChristopheD
有趣!查找时间通常有多重要?这通常是一个有用的加速,还是更多因为这个特定循环的数量级? - nearlymonolith
2
@Anthony Morelli:本地变量查找比全局或内置变量查找要快得多,因为“编译器”优化函数体,所以本地变量不需要字典查找(也请参见:http://www.python.org/doc/essays/list2str.html)。但是通常情况下,基准测试总是必要的,因为(不成功的)作用域查找时间受要考虑的项目大小等影响。但是对于这么大的循环,我认为这种技术值得考虑(尚未经过测试,但至少应该更快一些)。 - ChristopheD
1
@Anthony Morelli:除了ChristopheD所说的之外,还有:(1)你的所有变量都是全局的;将代码放在一个函数中并调用它(2)使用1.0001 < ret < 1.5(3)results.extend语句缩进错误。 - John Machin
谢谢!我已经做出了更改 - 尽管 1.0001 < ret < 1.5 只是在编译时扩展到 ret > 1.0001 和 ret < 1.5(根据我所读/记住的)。编辑:虽然我猜变量 ret 只需要查找一次,这可能更快。 - nearlymonolith
我非常确定这里的vol计算存在问题。初始代码中,切片是从i+1:j+1,而j从i+1开始。对我来说,这意味着vol应该初始化为0,并且在循环的每次迭代中添加volume[j],而不是volume[j+1] - Justin Peel

1

为什么不尝试将结果生成为一个单一的列表(比追加或扩展更快),像这样:

results = [ t for t in ( (i, j, close[j]/close[i], sum(volume[i+1:j+1]))
                         for i in xrange(len(close)-INT_LEN)
                             for j in xrange(i+1, i+INT_LEN)
                       )
            if t[3] > 100 and 1.0001 < t[2] < 1.5
          ]

网页内容由stack overflow 提供, 点击上面的
可以查看英文原文,
原文链接