矢量化搜索排序的numpy

6
假设我有两个数组A和B,其中A和B都是m x n的。我的目标是,对于A和B的每一行,找到应该将A的第i行元素插入到B的相应行中的位置。也就是说,我希望对A和B的每一行应用np.digitize或np.searchsorted。
我的简单解决方案是简单地遍历这些行。然而,这对我的应用程序来说太慢了。因此,我的问题是:是否有向量化实现这两种算法的方法,我还没有找到?

每行中的A和B元素是否已排序? - Divakar
是的,它们是。我基本上正在实现系统重采样。 - Tingiskhan
如果您展示当前的实现,我们可以指出需要改进的地方。 - Balzola
3个回答

9
我们可以为每一行添加一些偏移量,相对于前一行。我们会使用同样的偏移量来处理这两个数组。其想法是在之后使用np.searchsorted在输入数组的扁平化版本上,因此每一行从b找到的位置都会被限定在相应行的a中。此外,为使其也适用于负数,我们只需为最小数字提供偏移。因此,我们将得到如下矢量化实现 -
def searchsorted2d(a,b):
    m,n = a.shape
    max_num = np.maximum(a.max() - a.min(), b.max() - b.min()) + 1
    r = max_num*np.arange(a.shape[0])[:,None]
    p = np.searchsorted( (a+r).ravel(), (b+r).ravel() ).reshape(m,-1)
    return p - n*(np.arange(m)[:,None])

运行时测试 -

In [173]: def searchsorted2d_loopy(a,b):
     ...:     out = np.zeros(a.shape,dtype=int)
     ...:     for i in range(len(a)):
     ...:         out[i] = np.searchsorted(a[i],b[i])
     ...:     return out
     ...: 

In [174]: # Setup input arrays
     ...: a = np.random.randint(11,99,(10000,20))
     ...: b = np.random.randint(11,99,(10000,20))
     ...: a = np.sort(a,1)
     ...: b = np.sort(b,1)
     ...: 

In [175]: np.allclose(searchsorted2d(a,b),searchsorted2d_loopy(a,b))
Out[175]: True

In [176]: %timeit searchsorted2d_loopy(a,b)
10 loops, best of 3: 28.6 ms per loop

In [177]: %timeit searchsorted2d(a,b)
100 loops, best of 3: 13.7 ms per loop

2
太棒了!非常感谢Divakar - 你的解决方案总是干净而优雅! - Tingiskhan
使用 side 参数等于 'right' 是否会影响结果?我的猜测是不会。 - piRSquared
@piRSquared应该可以将该参数设置为“right”。 - Divakar

3
@Divakar提供的解决方案适用于整数数据,但是对于浮点数值的精度问题要小心,特别是如果它们跨越多个数量级(例如:[[1.0, 2.0, 3.0, 1.0e + 20],...])。在某些情况下,r 可能非常大,以至于应用a+rb+r会抹掉您试图运行searchsorted的原始值,并且您只是将rr进行比较。
为了使方法对浮点数据更加稳健,您可以将行信息作为值的一部分(作为结构化dtype)嵌入到数组中,并在这些结构化dtypes上运行searchsorted。
def searchsorted_2d (a, v, side='left', sorter=None):
  import numpy as np

  # Make sure a and v are numpy arrays.
  a = np.asarray(a)
  v = np.asarray(v)

  # Augment a with row id
  ai = np.empty(a.shape,dtype=[('row',int),('value',a.dtype)])
  ai['row'] = np.arange(a.shape[0]).reshape(-1,1)
  ai['value'] = a

  # Augment v with row id
  vi = np.empty(v.shape,dtype=[('row',int),('value',v.dtype)])
  vi['row'] = np.arange(v.shape[0]).reshape(-1,1)
  vi['value'] = v

  # Perform searchsorted on augmented array.
  # The row information is embedded in the values, so only the equivalent rows 
  # between a and v are considered.
  result = np.searchsorted(ai.flatten(),vi.flatten(), side=side, sorter=sorter)

  # Restore the original shape, decode the searchsorted indices so they apply to the original data.
  result = result.reshape(vi.shape) - vi['row']*a.shape[1]

  return result

编辑:这种方法的时机非常糟糕!

In [21]: %timeit searchsorted_2d(a,b)
10 loops, best of 3: 92.5 ms per loop

你最好只使用map来处理数组:

In [22]: %timeit np.array(list(map(np.searchsorted,a,b)))
100 loops, best of 3: 13.8 ms per loop

对于整数数据,@Divakar的方法仍然是最快的:

In [23]: %timeit searchsorted2d(a,b)
100 loops, best of 3: 7.26 ms per loop

2

我认为@Divakar的解决方案需要以下两点说明(由于声誉问题,我无法添加评论):

  1. 这是一种计算(向量化)优化,而不是算法优化。这意味着在理论上并没有复杂度上的提升。
  2. 实际上,在某些情况下它比原来的方法要慢得多,有时甚至慢了10倍以上。例如,当使用形状为(20, 10000)而不是(10000,20)时,当b.shape[1]远小于a.shape[1]时,或者当数据量过大时,将其转换成一个非常长的行会导致内存效率低下。

网页内容由stack overflow 提供, 点击上面的
可以查看英文原文,
原文链接