如何提高这个numpy循环的效率?

8
我有一个包含标签的numpy数组。我想根据每个标签的大小和边界框计算一个数字。如何编写更有效率的代码,以便在大型数组(约15000个标签)上使用?
A = array([[ 1, 1, 0, 3, 3],
           [ 1, 1, 0, 0, 0],
           [ 1, 0, 0, 2, 2],
           [ 1, 0, 2, 2, 2]] )

B = zeros( 4 )

for label in range(1, 4):
    # get the bounding box of the label
    label_points = argwhere( A == label )
    (y0, x0), (y1, x1) = label_points.min(0), label_points.max(0) + 1

    # assume I've computed the size of each label in a numpy array size_A
    B[ label ] = myfunc(y0, x0, y1, x1, size_A[label])

在实际使用情况下,A有多大? - Sven Marnach
你是否进行了一些分析以确定哪个语句在拖慢你的速度?也许是函数myfunc,可以通过将y0、x0、y1、x1保存在单独的数组中并脱离循环只调用一次来实现并行化。否则,如果速度真的很重要,你可能需要考虑是否值得编写一些C代码。我发现使用cython处理numpy数组非常舒适。 - David Zwicker
2
我认为每个标签的argwhere调用是致命的。 - ajwood
5个回答

7

我并不能使用一些NumPy向量化函数有效地实现这个,所以也许一个聪明的Python实现会更快。

def first_row(a, labels):
    d = {}
    d_setdefault = d.setdefault
    len_ = len
    num_labels = len_(labels)
    for i, row in enumerate(a):
        for label in row:
            d_setdefault(label, i)
        if len_(d) == num_labels:
            break
    return d

该函数返回一个字典,将每个标签映射到它出现的第一行的索引。将该函数应用于AA.TA[::-1]A.T[::-1]也会给出第一列以及最后一行和列。
如果您更喜欢列表而不是字典,可以使用map(d.get, labels)将字典转换为列表。或者,您可以从一开始就使用NumPy数组代替字典,但这样做会失去尽早离开循环的能力,即在找到所有标签后立即离开循环。
我很感兴趣知道这是否(以及多少)加快了您的代码,但我相信它比您原来的解决方案更快。

这不是最美观的方法,但它绝对有效。我的原始方法运行时间太长了,我甚至没有让它完成(20分钟后就放弃了)。我刚刚运行了你的方法,只用了6分30秒就完成了。 - ajwood
@ajwood:感谢您的反馈。我知道它不太美观,但这是我能想到的最好的简单解决方案。如果您想更快地完成它,我建议您使用Cython实现。 - Sven Marnach

5

算法:

  1. 将数组转换为一维数组
  2. 使用argsort()获取排序索引
  3. 获取一维数组的排序版本作为sorted_A
  4. 使用where()和diff()在sorted_A中找到标签变化的位置
  5. 使用变化的位置和排序索引来获取标签在一维中的原始位置
  6. 从一维位置计算出二维位置

对于大型数组(例如7000 x 9000),可以在30秒内完成计算。

以下是代码:

import numpy as np

A = np.array([[ 1, 1, 0, 3, 3],
           [ 1, 1, 0, 0, 0],
           [ 1, 0, 0, 2, 2],
           [ 1, 0, 2, 2, 2]] )

def label_range(A):
    from itertools import izip_longest
    h, w = A.shape
    tmp = A.reshape(-1)

    index = np.argsort(tmp)
    sorted_A = tmp[index]
    pos = np.where(np.diff(sorted_A))[0]+1
    for p1,p2 in izip_longest(pos,pos[1:]):
        label_index = index[p1:p2]
        y = label_index // w
        x = label_index % w

        x0 = np.min(x)
        x1 = np.max(x)+1
        y0 = np.min(y)
        y1 = np.max(y)+1
        label = tmp[label_index[0]]

        yield label,x0,y0,x1,y1

for label,x0,y0,x1,y1 in label_range(A):
    print "%d:(%d,%d)-(%d,%d)" % (label, x0,y0,x1,y1)

#B = np.random.randint(0, 100, (7000, 9000))
#list(label_range(B))

我不小心给你的帖子点了踩,因为我以为算法出了问题。我不得不进行一个虚拟编辑来解锁投票——改成了点赞。 :) - Sven Marnach

5

另一种方法:

使用bincount()函数获取每行和每列中标签的数量,并将信息保存在rows和cols数组中。

对于每个标签,您只需要在行和列中搜索范围。它比排序更快,在我的电脑上可以在几秒钟内完成计算。

def label_range2(A):
    maxlabel = np.max(A)+1
    h, w = A.shape
    rows = np.zeros((h, maxlabel), np.bool)
    for row in xrange(h):
        rows[row,:] = np.bincount(A[row,:], minlength=maxlabel) > 0

    cols = np.zeros((w, maxlabel), np.bool)
    for col in xrange(w):
        cols[col,:] =np.bincount(A[:,col], minlength=maxlabel) > 0

    for label in xrange(1, maxlabel):
        row = rows[:, label]
        col = cols[:, label]
        y = np.where(row)[0]
        x = np.where(col)[0]
        x0 = np.min(x)
        x1 = np.max(x)+1
        y0 = np.min(y)
        y1 = np.max(y)+1        
        yield label, x0,y0,x1,y1

这看起来非常有前途,我会尽快尝试一下。 - ajwood

1
使用PyPy,您可以直接运行循环而不必担心向量化。它应该会很快。

1

性能瓶颈似乎确实是对argmax的调用。可以通过以下方式更改循环来避免这种情况(仅计算y0,y1,但易于推广到x0,x1):

for label in range(1, 4):
    comp = (A == label)
    yminind = comp.argmax(0)
    ymin = comp.max(0)
    ymaxind = comp.shape[0] - comp[::-1].argmax(0)
    y0 = yminind[ymin].min()
    y1 = ymaxind[ymin].max()

我不确定性能差异的原因,但其中一个原因可能是所有操作(如==argmaxmax)都可以直接从输入数组的形状预分配其输出数组,而这对于argwhere来说是不可能的。


网页内容由stack overflow 提供, 点击上面的
可以查看英文原文,
原文链接