优化“差分函数”的计算

7
我的代码调用了许多“差分函数”来计算“Yin算法”(基频提取器)。
差分函数(论文中的公式6)定义如下:

enter image description here

这是我的差异函数实现方式:
def differenceFunction(x, W, tau_max):
   df = [0] * tau_max
   for tau in range(1, tau_max):
      for j in range(0, W - tau):
          tmp = long(x[j] - x[j + tau])
          df[tau] += tmp * tmp
   return df

例如使用:
x = np.random.randint(0, high=32000, size=2048, dtype='int16')
W = 2048
tau_max = 106
differenceFunction(x, W, tau_max)

有没有一种方法可以优化这个双重循环计算(仅使用Python,最好不使用除Numpy之外的其他库)?
编辑:更改代码以避免索引错误(j循环,@Elliot答案)
编辑2:更改代码以使用x [0](j循环,@hynekcer评论)

1
你想要更快的方法吗?还是在寻找使用矩阵的向量化解决方案? - Bharath M Shetty
1
我正在寻找使用Python最快的方法。 - PatriceG
3
如果你想保留for循环并获得最大速度,其中一种方法是使用numba。你是否接受使用其他库? - Bharath M Shetty
1
@Bharath Shetty。好观点,numba看起来很有前途。我会去看看的。同时,如果有其他库的解决方案,我仍然感兴趣。 - PatriceG
1
你的测试代码将导致越界异常。x有2048个元素,但你访问了第x[2047+105]个元素 - 你打算如何处理? - Martin Bonner supports Monica
显示剩余5条评论
7个回答

6

首先,您应该考虑数组的边界。按照原始编写方式,您的代码会引发“IndexError”异常。 通过将内部循环向量化,您可以获得显着的加速效果。

import numpy as np

# original version
def differenceFunction_2loop(x, W, tau_max):
   df = np.zeros(tau_max, np.long)
   for tau in range(1, tau_max):
      for j in range(0, W - tau): # -tau eliminates the IndexError
          tmp = np.long(x[j] -x[j + tau])
          df[tau] += np.square(tmp)
   return df

# vectorized inner loop
def differenceFunction_1loop(x, W, tau_max):
    df = np.zeros(tau_max, np.long)
    for tau in range(1, tau_max):
        tmp = (x[:-tau]) - (x[tau:]).astype(np.long)
        df[tau] = np.dot(tmp, tmp)
    return df

x = np.random.randint(0, high=32000, size=2048, dtype='int16')
W = 2048
tau_max = 106
twoloop = differenceFunction_2loop(x, W, tau_max)
oneloop = differenceFunction_1loop(x, W, tau_max)

# confirm that the result comes out the same. 
print(np.all(twoloop == oneloop))
# True

现在进行一些基准测试。在 ipython 中,我得到以下结果

In [103]: %timeit twoloop = differenceFunction_2loop(x, W, tau_max)
1 loop, best of 3: 2.35 s per loop

In [104]: %timeit oneloop = differenceFunction_1loop(x, W, tau_max)
100 loops, best of 3: 8.23 ms per loop

因此,速度提高了大约300倍。


注意:向量化是一种技术,当CPU同时处理数据片段时使用。消除范围检查称为迭代分裂。 - egorlitvinenko
1
@egorlitvinenko,NumPy在向量操作上比解释性语言中的循环更快的机制有很多,其中一个小机制是在某些情况下使用矢量寄存器上的SSE指令(流SIMD扩展(单指令多数据))。有用的简单术语“向量化”对应于维基百科上的向量化 - hynekcer
@hynekcer 我会看一下。谢谢你的解释。 - egorlitvinenko

6

编辑:将速度提高到220微秒 - 请查看结尾处的编辑-直接版本

可以通过 自相关函数 或卷积等方式轻松计算所需的计算。维纳 - 金钦定理允许使用两个快速傅里叶变换(FFT)计算自相关,时间复杂度为O(n log n)。 我使用Scipy包中的加速卷积函数fftconvolve。一个好处是可以很容易地在这里解释它的工作原理。一切都是向量化的,在Python解释器级别没有循环。

from scipy.signal import fftconvolve

def difference_by_convol(x, W, tau_max):
    x = np.array(x, np.float64)
    w = x.size
    x_cumsum = np.concatenate((np.array([0.]), (x * x).cumsum()))
    conv = fftconvolve(x, x[::-1])
    df = x_cumsum[w:0:-1] + x_cumsum[w] - x_cumsum[:w] - 2 * conv[w - 1:]
    return df[:tau_max + 1]
  • Elliot的答案中的differenceFunction_1loop函数相比:使用FFT更快:430微秒相对于原来的1170微秒。在tau_max >= 40时开始变得更快。数值精度很高,最高相对误差小于1E-14相对于精确整数结果。(因此可以轻松地舍入到精确的长整数解决方案。)
  • 参数tau_max对算法不重要。它只是最终限制输出。在Python中,将索引从0开始添加一个零元素到输出。
  • 参数W在Python中并不重要。最好检查大小。
  • 数据最初转换为np.float64以防止重复转换。这样可以快一半的速度。任何小于np.int64的类型都是不可接受的,因为会溢出。
  • 所需的差异函数是双能量减去自相关函数。这可以通过卷积来评估:correlate(x, x) = convolve(x, reversed(x)
  • “截至Scipy v0.19,正常的convolve根据哪种方法更快的估计选择该方法或直接方法。”该启发式方法对于这种情况不足够,因为卷积评估的tau远比tau_max多,必须被更快的FFT超过直接方法。
  • 可以通过将答案在matlab中使用FFT计算自相关性重写为Python(在末尾以下)来使用Numpy ftp模块进行计算。我认为上面的解决方案可能更容易理解。

证明:(针对Pythonista :-)

原始的naive实现可以写成:

df = [sum((x[j] - x[j + t]) ** 2   for j in range(w - t))  for t in range(tau_max + 1)]

tau_max < w 时。

根据规则推导得到 (a - b)**2 == a**2 + b**2 - 2 * a * b

df = [  sum(x[j] ** 2 for j in range(w - t))
      + sum(x[j] ** 2 for j in range(t, w))
      - 2 * sum(x[j] * x[j + t] for j in range(w - t))
      for t in range(tau_max + 1)]

通过使用 x_cumsum = [sum(x[j] ** 2 for j in range(i)) for i in range(w + 1)] 可以轻松在线性时间内计算的方法,将前两个元素替换掉。将 sum(x[j] * x[j + t] for j in range(w - t)) 替换为卷积 conv = convolvefft(x, reversed(x), mode='full'),其输出大小为 len(x) + len(x) - 1

df = [x_cumsum[w - t] + x_cumsum[w] - x_cumsum[t]
      - 2 * convolve(x, x[::-1])[w - 1 + t]
      for t in range(tau_max + 1)]

通过向量表达式进行优化:

df = x_cumsum[w:0:-1] + x_cumsum[w] - x_cumsum[:w] - 2 * conv[w - 1:]

每个步骤都可以通过测试数据进行数值测试和比较。
编辑:直接使用Numpy FFT实现解决方案。
def difference_fft(x, W, tau_max):
    x = np.array(x, np.float64)
    w = x.size
    tau_max = min(tau_max, w)
    x_cumsum = np.concatenate((np.array([0.]), (x * x).cumsum()))
    size = w + tau_max 
    p2 = (size // 32).bit_length()
    nice_numbers = (16, 18, 20, 24, 25, 27, 30, 32)
    size_pad = min(x * 2 ** p2 for x in nice_numbers if x * 2 ** p2 >= size)
    fc = np.fft.rfft(x, size_pad)
    conv = np.fft.irfft(fc * fc.conjugate())[:tau_max]
    return x_cumsum[w:w - tau_max:-1] + x_cumsum[w] - x_cumsum[:tau_max] - 2 * conv

这个解决方案比我之前的解决方案更快两倍以上,因为卷积长度被限制在W + tau_max后最近的“好”数字上,并且具有小的质因数,而不是评估完整的2*W。与使用`fftconvolve(x, reversed(x))`时不必两次转换相同的数据。

In [211]: %timeit differenceFunction_1loop(x, W, tau_max)
1.1 ms ± 4.51 µs per loop (mean ± std. dev. of 7 runs, 1000 loops each)

In [212]: %timeit difference_by_convol(x, W, tau_max)
431 µs ± 5.69 µs per loop (mean ± std. dev. of 7 runs, 1000 loops each)

In [213]: %timeit difference_fft(x, W, tau_max)
218 µs ± 685 ns per loop (mean ± std. dev. of 7 runs, 1000 loops each)

最新的解决方案在 tau_max >= 20 的情况下比Eliot的difference_by_convol更快。由于开销成本的相似比率,该比率不会很大程度上依赖数据大小。

@PatriceGuyot 添加了一个两倍更快的方法。 - hynekcer

1
与优化算法不同,您可以使用numba.jit优化解释器:
import timeit

import numpy as np
from numba import jit


def differenceFunction(x, W, tau_max):
    df = [0] * tau_max
    for tau in range(1, tau_max):
        for j in range(0, W - tau):
            tmp = int(x[j] - x[j + tau])
            df[tau] += tmp * tmp
    return df


@jit
def differenceFunction2(x, W, tau_max):
    df = np.ndarray(shape=(tau_max,))
    for tau in range(1, tau_max):
        for j in range(0, W - tau):
            tmp = int(x[j] - x[j + tau])
            df[tau] += tmp * tmp
    return df


x = np.random.randint(0, high=32000, size=2048, dtype='int16')
W = 2048
tau_max = 106
differenceFunction(x, W, tau_max)


print('old',
      timeit.timeit('differenceFunction(x, W, tau_max)', 'from __main__ import differenceFunction, x, W, tau_max',
                    number=20) / 20)
print('new',
      timeit.timeit('differenceFunction2(x, W, tau_max)', 'from __main__ import differenceFunction2, x, W, tau_max',
                    number=20) / 20)

结果:

old 0.18265145074453273
new 0.016223197058214667

你可以将算法优化和 numba.jit 结合起来,以获得更好的结果。

1
这里是另一种使用列表推导的方法。它所需时间约为原始函数的十分之一,但无法击败Elliot's answer。不管怎样,我还是提出来了。
import numpy as np
import time

# original version
def differenceFunction_2loop(x, W, tau_max):
   df = np.zeros(tau_max, np.long)
   for tau in range(1, tau_max):
      for j in range(0, W - tau): # -tau eliminates the IndexError
          tmp = np.long(x[j] -x[j + tau])
          df[tau] += np.square(tmp)
   return df

# vectorized inner loop
def differenceFunction_1loop(x, W, tau_max):
    df = np.zeros(tau_max, np.long)
    for tau in range(1, tau_max):
        tmp = (x[:-tau]) - (x[tau:]).astype(np.long)
        df[tau] = np.dot(tmp, tmp)
    return df

# with list comprehension
def differenceFunction_1loop_listcomp(x, W, tau_max):
    df = [sum(((x[:-tau]) - (x[tau:]).astype(np.long))**2) for tau in range(1, tau_max)]
    return [0] + df[:]

x = np.random.randint(0, high=32000, size=2048, dtype='int16')
W = 2048
tau_max = 106

s = time.clock()
twoloop = differenceFunction_2loop(x, W, tau_max)
print(time.clock() - s)

s = time.clock()
oneloop = differenceFunction_1loop(x, W, tau_max)
print(time.clock() - s)

s = time.clock()
listcomprehension = differenceFunction_1loop_listcomp(x, W, tau_max)
print(time.clock() - s)

# confirm that the result comes out the same. 
print(np.all(twoloop == listcomprehension))
# True

性能结果(大约):
differenceFunction_2loop() = 0.47s
differenceFunction_1loop() = 0.003s
differenceFunction_1loop_listcomp() = 0.033s

0

我不知道你如何解决嵌套循环的问题,但对于算术函数,你可以使用numpy库。它比手动操作更快。

import numpy as np
tmp = np.subtract(long(x[j] ,x[j + tau])

0
我会这样做:
>>> x = np.random.randint(0, high=32000, size=2048, dtype='int16')
>>> tau_max = 106
>>> res = np.square((x[tau_max:] - x[:-tau_max]))

然而我相信这不是最快的方式来做到它。

0

我试图理解最快的答案,结果想出了一种更快、更简单的解决方案。

def autocorrelation(x):
    result = np.correlate(x, x, mode='full')
    return result[result.size // 2:]

def difference(x):
    return np.dot(x, x) + (x * x)[::-1].cumsum()[::-1] - 2 * autocorrelation(x)

该解决方案基于YIN论文中定义的差异函数。

%%timeit
difference(frame)

140 µs ± 438 ns per loop (mean ± std. dev. of 7 runs, 10000 loops each)

网页内容由stack overflow 提供, 点击上面的
可以查看英文原文,
原文链接