如何在Python中加速多个内积计算

17

我有一些简单的代码,它执行以下操作。

它遍历所有可能的长度为n的列表 F,其中每个条目为+1或-1。 对于每个列表,它遍历所有可能的长度为2n的列表 S,其中每个条目为+1或-1,其中的前半部分是后半部分的复制。 该代码计算F与长度为nS的每个子列表的内积。 对于每个F和S,它计算了内积为零直到第一个非零内积的数量。

这是代码:

#!/usr/bin/python

from __future__ import division
import itertools
import operator
import math

n=14
m=n+1
def innerproduct(A, B):
    assert (len(A) == len(B))
    s = 0 
    for k in xrange(0,n):
        s+=A[k]*B[k]
    return s

leadingzerocounts = [0]*m
for S in itertools.product([-1,1], repeat = n):
    S1 = S + S
    for F in itertools.product([-1,1], repeat = n):
        i = 0
        while (i<m):
            ip = innerproduct(F, S1[i:i+n])
            if (ip == 0):
                leadingzerocounts[i] +=1
                i+=1
            else:
                break

print leadingzerocounts

n=14 的正确输出为

[56229888, 23557248, 9903104, 4160640, 1758240, 755392, 344800, 172320, 101312, 75776, 65696, 61216, 59200, 59200, 59200]

使用pypy,n = 14时需要1分18秒。不幸的是,我真的想要运行它的16、18、20、22、24、26。如果有可能,我不介意使用numba或cython,但我希望尽可能接近Python。

非常感谢任何帮助加速这个程序。


我会在这里记录最快的解决方案。(如果我错过了更新的答案,请告诉我。)

  • Eisenstat(C)以9m35.081s获得n = 22
  • Eisenstat(pypy)以1m16.344s获得n = 18
  • Tupteq(pypy)以2m54.998s获得n = 18
  • Neil(numpy)以26秒获得n = 14
  • kslote1(pypy)以11m59.192s获得n = 14

你尝试过使用Numpy多维数组吗? - tom
3
可能没有机会添加代码,但是需要注意到 IP(A,B) = IP(A[:n/2 + 1], B[:n/2 + 1]) + IP(A[n/2 + 1:], B[n/2 + 1:]) 这一点,可以基于 subset sum 中使用的类似技术进行改进。这应该可以实现 O(2^N) 的算法而不是 O(2^(2N)),尽管可能需要 O(2^N) 的空间。这利用了查找所有大小为N / 2的配对的所有IP地址(其中有O(2^N)个),然后使用它来构建解集。可以使用图来处理while循环中找到的状态转换。 - Nuclearman
经过一些测试,上述方法可能不太实用。问题在于处理状态转换似乎需要分支,这会引入之前已经消除的数字以及重复项。基本上,我编写的算法在第二个(i=2及以后)之后给出了不正确的计数,并且仅仅删除重复项是不足以修复它的,尽管它有很大帮助,这表明这种方法可能存在缺陷,至少在获得O(2^N)空间/时间性能方面如此。 - Nuclearman
@Nuclearman 我得承认,我觉得这很惊讶。 - Simd
你可以自己尝试一下。IP匹配部分足够简单,并且非常快速地获取第一个计数。我无法正确处理转移的批处理,也不确定是否可能。我可能不会尝试实现算法的正确解决方案,因为除非它是O(2^N),但我认为这很不可能,否则它很可能不会比David Eisenstat的答案更好。 - Nuclearman
5个回答

22

这段新代码利用了问题的循环对称性,使速度提高了一个数量级。Python版本使用Duval算法列举项链; C版本使用暴力方法。两者都包含下面描述的加速功能。在我的计算机上,C版本可以在100秒内解决n = 20!简单的估算表明,如果你让它在一个单核心上运行一周,它就能做到n = 26,并且如下所述,它适合并行处理。

import itertools


def necklaces_with_multiplicity(n):
    assert isinstance(n, int)
    assert n > 0
    w = [1] * n
    i = 1
    while True:
        if n % i == 0:
            s = sum(w)
            if s > 0:
                yield (tuple(w), i * 2)
            elif s == 0:
                yield (tuple(w), i)
        i = n - 1
        while w[i] == -1:
            if i == 0:
                return
            i -= 1
        w[i] = -1
        i += 1
        for j in range(n - i):
            w[i + j] = w[j]


def leading_zero_counts(n):
    assert isinstance(n, int)
    assert n > 0
    assert n % 2 == 0
    counts = [0] * n
    necklaces = list(necklaces_with_multiplicity(n))
    for combo in itertools.combinations(range(n - 1), n // 2):
        for v, multiplicity in necklaces:
            w = list(v)
            for j in combo:
                w[j] *= -1
            for i in range(n):
                counts[i] += multiplicity * 2
                product = 0
                for j in range(n):
                    product += v[j - (i + 1)] * w[j]
                if product != 0:
                    break
    return counts


if __name__ == '__main__':
    print(leading_zero_counts(12))

C语言版本:

#include <stdio.h>

enum {
  N = 14
};

struct Necklace {
  unsigned int v;
  int multiplicity;
};

static struct Necklace g_necklace[1 << (N - 1)];
static int g_necklace_count;

static void initialize_necklace(void) {
  g_necklace_count = 0;
  for (unsigned int v = 0; v < (1U << (N - 1)); v++) {
    int multiplicity;
    unsigned int w = v;
    for (multiplicity = 2; multiplicity < 2 * N; multiplicity += 2) {
      w = ((w & 1) << (N - 1)) | (w >> 1);
      unsigned int x = w ^ ((1U << N) - 1);
      if (w < v || x < v) goto nope;
      if (w == v || x == v) break;
    }
    g_necklace[g_necklace_count].v = v;
    g_necklace[g_necklace_count].multiplicity = multiplicity;
    g_necklace_count++;
   nope:
    ;
  }
}

int main(void) {
  initialize_necklace();
  long long leading_zero_count[N + 1];
  for (int i = 0; i < N + 1; i++) leading_zero_count[i] = 0;
  for (unsigned int v_xor_w = 0; v_xor_w < (1U << (N - 1)); v_xor_w++) {
    if (__builtin_popcount(v_xor_w) != N / 2) continue;
    for (int k = 0; k < g_necklace_count; k++) {
      unsigned int v = g_necklace[k].v;
      unsigned int w = v ^ v_xor_w;
      for (int i = 0; i < N + 1; i++) {
        leading_zero_count[i] += g_necklace[k].multiplicity;
        w = ((w & 1) << (N - 1)) | (w >> 1);
        if (__builtin_popcount(v ^ w) != N / 2) break;
      }
    }
  }
  for (int i = 0; i < N + 1; i++) {
    printf(" %lld", 2 * leading_zero_count[i]);
  }
  putchar('\n');
  return 0;
}

通过利用符号对称性(4x)和仅迭代那些通过第一个内积测试的向量(渐进地,O(sqrt(n))x),您可以获得一点加速。

import itertools


n = 10
m = n + 1


def innerproduct(A, B):
    s = 0
    for k in range(n):
        s += A[k] * B[k]
    return s


leadingzerocounts = [0] * m
for S in itertools.product([-1, 1], repeat=n - 1):
    S1 = S + (1,)
    S1S1 = S1 * 2
    for C in itertools.combinations(range(n - 1), n // 2):
        F = list(S1)
        for i in C:
            F[i] *= -1
        leadingzerocounts[0] += 4
        for i in range(1, m):
            if innerproduct(F, S1S1[i:i + n]):
                break
            leadingzerocounts[i] += 4
print(leadingzerocounts)

为了了解我们失去了多少性能,可以使用 C 版本来感受一下(对于 PyPy 的 16 约等于 C 的 18):

#include <stdio.h>

enum {
  HALFN = 9,
  N = 2 * HALFN
};

int main(void) {
  long long lzc[N + 1];
  for (int i = 0; i < N + 1; i++) lzc[i] = 0;
  unsigned int xor = 1 << (N - 1);
  while (xor-- > 0) {
    if (__builtin_popcount(xor) != HALFN) continue;
    unsigned int s = 1 << (N - 1);
    while (s-- > 0) {
      lzc[0]++;
      unsigned int f = xor ^ s;
      for (int i = 1; i < N + 1; i++) {
        f = ((f & 1) << (N - 1)) | (f >> 1);
        if (__builtin_popcount(f ^ s) != HALFN) break;
        lzc[i]++;
      }
    }
  }
  for (int i = 0; i < N + 1; i++) printf(" %lld", 4 * lzc[i]);
  putchar('\n');
  return 0;
}

这个算法是尴尬地并行的,因为它只是在所有xor的值上进行累积。对于C语言版本,根据简单估算,几千个CPU小时就足以计算n=26,在EC2的当前费率下大约需要几百美元。毫无疑问还有一些优化可以进行(例如向量化),但对于像这样的一次性任务,我不确定还有多少程序员的努力是值得的。


谢谢,这确实加快了速度。使用您的方法我可以得到n=16。 - Simd
我必须承认,我不明白为什么这个答案没有得到更多的赞。有时候SO真是个谜。 - Simd
1
@user2179021 不用担心。我写这个答案的时候非常开心。 - David Eisenstat

7

一个非常简单的n倍加速方法是改变这段代码:

def innerproduct(A, B):
    assert (len(A) == len(B))
    for j in xrange(len(A)):
        s = 0 
        for k in xrange(0,n):
            s+=A[k]*B[k]
    return s

to

def innerproduct(A, B):
    assert (len(A) == len(B))
    s = 0 
    for k in xrange(0,n):
        s+=A[k]*B[k]
    return s

(我不知道为什么你需要循环j,但它只是每次都做同样的计算,因此是不必要的。)

2
谢谢,那只是一个bug!由于您回答得如此迅速,如果您不介意的话,我将修复这个问题。 - Simd

2

我尝试将这个转换成NumPy数组,并借鉴了这个问题:itertools product speed up

这是我得到的结果,(这里可能有更多的速度提升):

def find_leading_zeros(n):
    if n % 2:
        return numpy.zeros(n)
    m = n+1
    leading_zero_counts = numpy.zeros(m)
    product_list = [-1, 1]
    repeat = n
    s = (numpy.array(product_list)[numpy.rollaxis(numpy.indices((len(product_list),) * repeat),
                                                  0, repeat + 1).reshape(-1, repeat)]).astype('int8')
    i = 0
    size = s.shape[0] / 2
    products = numpy.zeros((size, size), dtype=bool)
    while i < m:
        products += (numpy.tensordot(s[0:size, 0:size],
                                     numpy.roll(s, i, axis=1)[0:size, 0:size],
                                     axes=(-1,-1))).astype('bool')
        leading_zero_counts[i] = (products.size - numpy.sum(products)) * 4
        i += 1

    return leading_zero_counts

当 n=14 时,我的运行结果如下:
>>> find_leading_zeros(14)
array([ 56229888.,  23557248.,   9903104.,   4160640.,   1758240.,
        755392.,    344800.,    172320.,    101312.,     75776.,
        65696.,     61216.,     59200.,     59200.,     59200.])

所以一切看起来都很好。至于速度:

>>> timeit.timeit("find_leading_zeros_old(10)", number=10)
28.775046825408936
>>> timeit.timeit("find_leading_zeros(10)", number=10)
2.236745834350586

看看你的想法。

编辑:

原始版本在N=14时使用2074MB内存,因此我已删除连接数组并改用numpy.roll。并且将数据类型更改为使用布尔数组,将内存降至277MB,用于n=14。

从时间上看,编辑再次略快一些:

>>> timeit.timeit("find_leading_zeros(10)", number=10)
1.3816070556640625

编辑2:

好的,正如David所指出的那样,加入对称性后我再次进行了优化。现在它只使用了213MB。与之前的编辑相比,比较时间如下:

>>> timeit.timeit("find_leading_zeros(10)", number=10)
0.35357093811035156 

我觉得在我的MacBook上用"纯Python"做n=14的情况,只需要14秒就可以完成,这个速度还不错。


很遗憾,对于 n=14,您的解决方案使用了太多的 RAM,我无法进行测试。 - Simd

2

我试图加速这个过程,但是失败了:( 但我会发送代码,它速度更快一些,但对于像n=24这样的值来说仍然不够快。

我的假设

您的列表由值组成,因此我决定使用数字代替列表 - 每个位表示可能值之一:如果设置了位,则表示1,如果将其归零,则表示-1。乘法{-1,1}的唯一可能结果是1-1,因此我使用按位XOR代替乘法。我还注意到有一个对称性,因此您只需要检查可能列表的子集(四分之一),并将结果乘以4(David在他的答案中解释了这一点)。

最后,我将可能操作的结果放入表格中,以消除计算的需要。它需要大量内存,但谁在乎呢(对于n=24,大约为150MB)?

然后@David Eisenstat回答了这个问题:) 所以,我采用了他的代码并进行了基于位的修改。它大约快2-3倍(对于n=16,需要大约30秒,而David的解决方案需要大约90秒),但我认为这仍然不足以获得n=26或更高的结果。

import itertools

n = 16
m = n + 1
mask = (2 ** n) - 1

# Create table of sum results (replaces innerproduct())
tab = []
for a in range(2 ** n):
    s = 0
    for k in range(n):
        s += -1 if a & 1 else 1
        a >>= 1
    tab.append(s)

# Create combination bit masks for combinations
comb = []
for C in itertools.combinations(range(n - 1), n // 2):
    xor = 0
    for i in C:
       xor |= (1 << i)
    comb.append(xor)

leadingzerocounts = [0] * m
for S in xrange(2 ** (n-1)):
    S1 = S + (1 << (n-1))
    S1S1 = S1 + (S1 << n)

    for xor in comb:
        F = S1 ^ xor

        leadingzerocounts[0] += 4
        for i in range(1, m):
            if tab[F ^ ((S1S1 >> i) & mask)]:
                break
            leadingzerocounts[i] += 4

print(leadingzerocounts)

结论

我曾经认为自己发明了一些非常棒的东西,希望这些位操作可以大幅提高速度,但是实际上提升的效果非常小 :(

我认为原因在于Python使用操作符的方式——即使可以通过单个汇编指令完成运算(或逻辑运算),它也会为每个算术(或逻辑)操作调用函数(我希望pypy能够将操作简化到这个级别,但它没有做到)。所以,如果使用C语言(或汇编语言)来实现这个位操作解决方案,它可能表现得很好(也许可以达到n=24)。


降到 C 语言并没有太多额外的影响(请参见我的编辑)。问题在于,每次将 n 增加 2 倍时,工作量大约增长了 16 倍。 - David Eisenstat
所以,使用C代码可以更进一步。也许可以到n=22或24。 - Tupteq
我在使用pypy和你的代码的帮助下成功完成了n = 18。谢谢。 - Simd

0
在我看来,提高性能的好方法是使用Python内置函数。
首先使用map计算条目的乘积:
>>> a =[1,2,3]
>>> b = [4,5,6]
>>>map(lambda x,y : x*y, a , b)
[4, 10, 18]

然后使用 reduce 计算总和:
>>> reduce(lambda v,w: v+w, map(lambda x,y :x*y, a, b))
32

那么你的函数就变成了

def innerproduct(A, B):
    assert (len(A) == len(B))
    return reduce(lambda v,w: v+w, map(lambda x,y :x*y, A, B))

接下来,我们可以将所有的“for循环”替换为生成器,并捕获StopIteration异常。
#!/usr/bin/python

from __future__ import division
import itertools
import operator
import math

n=14
m=n+1
def innerproduct(A, B):
    assert (len(A) == len(B))
    return reduce(lambda v,w: v+w, map(lambda x,y :x*y, A, B))


leadingzerocounts = [0]*m

S_gen = itertools.product([-1,1], repeat = n)

try:
    while(True):
       S = S_gen.next()
       S1 = S + S
       F_gen = itertools.product([-1,1], repeat = n)
       try:
           while(True):
               F = F_gen.next()
               for i in xrange(m):
                   ip = innerproduct(F, S1[i:i+n])
                   if (ip == 0):
                       leadingzerocounts[i] +=1
                       i+=1
                   else:
                      break
       except StopIteration:
           pass

except StopIteration as e:
    print e

print leadingzerocounts

我发现对于较小的n,速度有所提升,但我的计算机没有足够的马力来计算n=14的版本或原始代码。进一步加快速度的方法是将该行进行记忆化:

    F_gen = itertools.product([-1,1], repeat = n)

谢谢你。不幸的是,就像你建议的那样,你的代码对于n = 14来说非常慢。 - Simd

网页内容由stack overflow 提供, 点击上面的
可以查看英文原文,
原文链接