如何在Python中优化矩阵的数学运算

4
我正在尝试缩短一个执行两个矩阵系列计算的函数时间。在搜索过程中,我听说过numpy,但我真的不知道如何将其应用于我的问题。此外,我认为使我的函数变慢的原因之一是有许多点运算符(我在这个this page中听说过)。
这个数学问题对应于二次分配问题的分解:

QAP Factorization

我的代码是:

    delta = 0
    for k in xrange(self._tam):
        if k != r and k != s:
            delta +=
                self._data.stream_matrix[r][k] \
                * (self._data.distance_matrix[sol[s]][sol[k]] - self._data.distance_matrix[sol[r]][sol[k]]) + \
                self._data.stream_matrix[s][k] \
                * (self._data.distance_matrix[sol[r]][sol[k]] - self._data.distance_matrix[sol[s]][sol[k]]) + \
                self._data.stream_matrix[k][r] \
                * (self._data.distance_matrix[sol[k]][sol[s]] - self._data.distance_matrix[sol[k]][sol[r]]) + \
                self._data.stream_matrix[k][s] \
                * (self._data.distance_matrix[sol[k]][sol[r]] - self._data.distance_matrix[sol[k]][sol[s]])
    return delta

在处理大小为20的问题时(20x20的矩阵),需要大约20秒钟,瓶颈在于这个函数。

ncalls  tottime  percall  cumtime  percall filename:lineno(function)
303878   15.712    0.000   15.712    0.000 Heuristic.py:66(deltaC)

我尝试将map应用于for循环,但由于循环体不是函数调用,因此不可能实现。
我该如何减少时间?
编辑1
回答eickenberg的评论: sol是一个排列,例如[1,2,3,4]。当我生成相邻解时调用该函数,因此[1,2,3,4]的相邻值是[2,1,3,4]。我只更改原始排列中的两个位置,然后调用deltaC,它计算交换位置r、s的解的分解(在上面的示例中,r、s = 0,1)。这个排列是为了避免计算相邻解的整个成本。我想我可以将sol[k,r,s]的值存储在本地变量中,以避免在每次迭代中查找其值。我不知道这是否是您在评论中询问的内容。 编辑2
一个最小的工作示例:
import random


distance_matrix = [[0, 12, 6, 4], [12, 0, 6, 8], [6, 6, 0, 7], [4, 8, 7, 0]]
stream_matrix = [[0, 3, 8, 3], [3, 0, 2, 4], [8, 2, 0, 5], [3, 4, 5, 0]]

def deltaC(r, s, S=None):
    '''
    Difference between C with values i and j swapped
    '''

    S = [0,1,2,3]

    if S is not None:
        sol = S
    else:
        sol = S

    delta = 0

    sol_r, sol_s = sol[r], sol[s]

    for k in xrange(4):
        if k != r and k != s:
            delta += (stream_matrix[r][k] \
                * (distance_matrix[sol_s][sol[k]] - distance_matrix[sol_r][sol[k]]) + \
                stream_matrix[s][k] \
                * (distance_matrix[sol_r][sol[k]] - distance_matrix[sol_s][sol[k]]) + \
                stream_matrix[k][r] \
                * (distance_matrix[sol[k]][sol_s] - distance_matrix[sol[k]][sol_r]) + \
                stream_matrix[k][s] \
                * (distance_matrix[sol[k]][sol_r] - distance_matrix[sol[k]][sol_s]))
    return delta


for _ in xrange(303878):
    d = deltaC(random.randint(0,3), random.randint(0,3))
print d

现在我认为更好的选择是使用NumPy。我试过Matrix(),但没有改善性能。
找到的最佳解决方案
嗯,最终我能够通过结合@TooTone的解决方案并将索引存储在集合中来减少时间,以避免if语句。时间从大约18秒降至8秒。这是代码:
def deltaC(self, r, s, sol=None):
    delta = 0
    sol = self.S if sol is None else self.S
    sol_r, sol_s = sol[r], sol[s]

    stream_matrix = self._data.stream_matrix
    distance_matrix = self._data.distance_matrix

    indexes = set(xrange(self._tam)) - set([r, s])

    for k in indexes:
        sol_k = sol[k]
        delta += \
            (stream_matrix[r][k] - stream_matrix[s][k]) \
            * (distance_matrix[sol_s][sol_k] - distance_matrix[sol_r][sol_k]) \
            + \
            (stream_matrix[k][r] - stream_matrix[k][s]) \
            * (distance_matrix[sol_k][sol_s] - distance_matrix[sol_k][sol_r])
    return delta

为了进一步缩短时间,我认为最好的方法是编写一个模块。

1
你应该看一下numpy,它是一个专门为优化数值计算而设计的成熟库。几乎总是比你自己编写的代码更加优化。 - Can Ibanoglu
3
首先尝试将您的NumPy操作向量化是第一步。目前,您的代码对于NumPy来说是最低效的:使用for循环并在每次迭代中查找s,如sol[s],即使它保持不变。在尝试呈现解决方案之前,如果您能告诉我们是否必须针对所有r, s执行此操作以及sol是否为索引的固定排列,则会非常好。如果向量化无法奏效(虽然应该可以),则可以尝试编译数值表达式,例如使用numexpr,但我建议将其留到更后面。 - eickenberg
另外,您能告诉我们您想在哪些维度 r、s 和 k 上使用它吗? - eickenberg
如果self._data.stream_matrixself._data.distance_matrix是numpy矩阵类,那么效率会更高吗? - Alejandro Alcalde
真正有帮助的是,如果您可以在最后添加一个完全可运行的“最小”可执行程序,以便其他人可以运行它,修改它,并检查他们是否获得与您相同的结果(例如,使用此程序获取delta的值)。矩阵不必像20x20那样大,您只需要为矩阵添加示例数据,对于sol,请将对self._tam的引用替换为常量等。 - TooTone
显示剩余8条评论
1个回答

6
在你提供的简单示例中,使用for k in xrange(4):循环体只执行两次(如果r==s),或者三次(如果r!=s)。下面是一个初始的numpy实现,但它的速度慢了很多。Numpy被优化用于对长向量进行计算,如果向量短,开销可能会超过效益。(需要注意的是,在这个公式中,矩阵在不同的维度上被切片,并且以非连续方式索引,这只会使向量化实现更加复杂)。
import numpy as np

distance_matrix_np = np.array(distance_matrix)
stream_matrix_np = np.array(stream_matrix)
n = 4

def deltaC_np(r, s, sol):
    delta = 0
    sol_r, sol_s = sol[r], sol[s]

    K = np.array([i for i in xrange(n) if i!=r and i!=s])

    return np.sum(
        (stream_matrix_np[r,K] - stream_matrix_np[s,K]) \
        *  (distance_matrix_np[sol_s,sol[K]] - distance_matrix_np[sol_r,sol[K]]) + \
        (stream_matrix_np[K,r] - stream_matrix_np[K,s]) \
        * (distance_matrix_np[sol[K],sol_s] - distance_matrix_np[sol[K],sol_r]))

在这个numpy实现中,不是通过for循环遍历K中的元素,而是在numpy中应用于所有K中的元素。此外,请注意您的数学表达式可以简化。左侧括号中的每个术语都是右侧括号中术语的负值。
这也适用于您的原始代码。例如,(self._data.distance_matrix[sol[s]][sol[k]] - self._data.distance_matrix[sol[r]][sol[k]])等于-1乘以(self._data.distance_matrix[sol[r]][sol[k]] - self._data.distance_matrix[sol[s]][sol[k]]),因此您进行了不必要的计算,并且您的原始代码可以优化而不使用numpy。
结果证明,在numpy函数中的瓶颈是看似无害的列表推导式。
K = np.array([i for i in xrange(n) if i!=r and i!=s])

一旦用向量化代码替换这部分代码

if r==s:
    K=np.arange(n-1)
    K[r:] += 1
else:
    K=np.arange(n-2)
    if r<s:
        K[r:] += 1
        K[s-1:] += 1
    else:
        K[s:] += 1
        K[r-1:] += 1

numpy的函数速度要快得多

下面直接展示了运行时间图表(在本答案底部的原始图表是优化numpy函数之前的)。您可以看到,根据矩阵的大小,使用优化后的原始代码或numpy代码都是有道理的。

enter image description here

以下是完整代码,供参考,部分原因是希望其他人能够进一步进行。 (函数deltaC2是您原始代码的优化版本,以考虑数学表达式可以简化的方式。)

def deltaC(r, s, sol):
    delta = 0
    sol_r, sol_s = sol[r], sol[s]
    for k in xrange(n):
        if k != r and k != s:
            delta += \
                stream_matrix[r][k] \
                * (distance_matrix[sol_s][sol[k]] - distance_matrix[sol_r][sol[k]]) + \
                stream_matrix[s][k] \
                * (distance_matrix[sol_r][sol[k]] - distance_matrix[sol_s][sol[k]]) + \
                stream_matrix[k][r] \
                * (distance_matrix[sol[k]][sol_s] - distance_matrix[sol[k]][sol_r]) + \
                stream_matrix[k][s] \
                * (distance_matrix[sol[k]][sol_r] - distance_matrix[sol[k]][sol_s])
    return delta

import numpy as np

def deltaC_np(r, s, sol):
    delta = 0
    sol_r, sol_s = sol[r], sol[s]

    if r==s:
        K=np.arange(n-1)
        K[r:] += 1
    else:
        K=np.arange(n-2)
        if r<s:
            K[r:] += 1
            K[s-1:] += 1
        else:
            K[s:] += 1
            K[r-1:] += 1
    #K = np.array([i for i in xrange(n) if i!=r and i!=s]) #TOO SLOW

    return np.sum(
        (stream_matrix_np[r,K] - stream_matrix_np[s,K]) \
        *  (distance_matrix_np[sol_s,sol[K]] - distance_matrix_np[sol_r,sol[K]]) + \
        (stream_matrix_np[K,r] - stream_matrix_np[K,s]) \
        * (distance_matrix_np[sol[K],sol_s] - distance_matrix_np[sol[K],sol_r]))

def deltaC2(r, s, sol):
    delta = 0
    sol_r, sol_s = sol[r], sol[s]
    for k in xrange(n):
        if k != r and k != s:
            sol_k = sol[k]
            delta += \
                (stream_matrix[r][k] - stream_matrix[s][k]) \
                * (distance_matrix[sol_s][sol_k] - distance_matrix[sol_r][sol_k]) \
                + \
                (stream_matrix[k][r] - stream_matrix[k][s]) \
                * (distance_matrix[sol_k][sol_s] - distance_matrix[sol_k][sol_r])
    return delta


import time

N=200

elapsed1s = []
elapsed2s = []
elapsed3s = []
ns = range(10,410,10)
for n in ns:
    distance_matrix_np=np.random.uniform(0,n**2,size=(n,n))
    stream_matrix_np=np.random.uniform(0,n**2,size=(n,n))
    distance_matrix=distance_matrix_np.tolist()
    stream_matrix=stream_matrix_np.tolist()
    sol  = range(n-1,-1,-1)
    sol_np  = np.array(range(n-1,-1,-1))

    Is = np.random.randint(0,n-1,4)
    Js = np.random.randint(0,n-1,4)

    total1 = 0
    start = time.clock()
    for reps in xrange(N):
        for i in Is:
            for j in Js:
                total1 += deltaC(i,j, sol)
    elapsed1 = (time.clock() - start)
    start = time.clock()

    total2 = 0
    start = time.clock()
    for reps in xrange(N):
        for i in Is:
            for j in Js:
                total2 += deltaC_np(i,j, sol_np)
    elapsed2 = (time.clock() - start)

    total3 = 0
    start = time.clock()
    for reps in xrange(N):
        for i in Is:
            for j in Js:
                total3 += deltaC2(i,j, sol_np)
    elapsed3 = (time.clock() - start)

    print n, elapsed1, elapsed2, elapsed3, total1, total2, total3
    elapsed1s.append(elapsed1)
    elapsed2s.append(elapsed2)
    elapsed3s.append(elapsed3)

    #Check errors of one method against another
    #err = 0
    #for i in range(min(n,50)):
    #    for j in range(min(n,50)):
    #        err += np.abs(deltaC(i,j,sol)-deltaC_np(i,j,sol_np))
    #print err
import matplotlib.pyplot as plt

plt.plot(ns, elapsed1s, label='Original',lw=2)
plt.plot(ns, elapsed3s, label='Optimized',lw=2)
plt.plot(ns, elapsed2s, label='numpy',lw=2)
plt.legend(loc='upper left', prop={'size':16})
plt.xlabel('matrix size')
plt.ylabel('time')
plt.show()

在优化deltaC_np中的列表推导式之前,这里是原始图表。

enter image description here


非常感谢您的出色工作。我正在尝试实现delta_np,但在sol[k]中我得到了“只有一个元素的整数数组可以转换为索引”的错误提示。在调用delta_np之后,使用我的最小示例进行调试,这是一张图片:http://postimg.org/image/p276kbcxf/ - Alejandro Alcalde
也许我把你搞糊涂了,sol必须是解决方案的排列,如果问题的规模为4,则sol必须是长度为4的列表。 - Alejandro Alcalde
@algui91 检查 sol 是否为 numpy 数组。在开发过程中,我遇到了很多类似的错误(如果您遇到严重问题,请先让底部答案中的完整程序正常工作)。尽可能使用 numpy 数组 - 当我删除列表推导时,可以看到加速效果。 - TooTone
最大尺寸将为90x90。 - Alejandro Alcalde
在这种情况下,似乎值得走numpy的路线,至少对于更大的问题,因为从大小约为30开始,numpy的性能更好。 - TooTone
显示剩余4条评论

网页内容由stack overflow 提供, 点击上面的
可以查看英文原文,
原文链接