使用NumPy进行向量化基数排序 - 能否胜过np.sort?

7

Numpy目前还没有基数排序,所以我想知道是否可以使用现有的numpy函数编写一个基数排序。到目前为止,我已经编写了以下代码,虽然可以工作,但比numpy的快排慢大约10倍。

line profiler output

测试和基准测试:

a = np.random.randint(0, 1e8, 1e6)
assert(np.all(radix_sort(a) == np.sort(a))) 
%timeit np.sort(a)
%timeit radix_sort(a)
mask_b循环可以部分向量化,从&中广播出遮罩,并使用带有axis参数的cumsum,但这最终会导致性能下降,可能是由于内存占用增加所致。
如果有人能够改进我所做的工作,即使它仍然比np.sort慢...我也会感兴趣,这更多的是出于智力好奇和对numpy技巧的兴趣。
请注意,您可以轻松实现快速计数排序,尽管这仅适用于小整数数据。 编辑1:np.arange(n)从循环中取出可以稍微提高性能,但这并不是很令人兴奋。 编辑2:cumsum实际上是多余的(哎呀!),但这个更简单的版本只能在性能方面略微提高。
def radix_sort(a):
    bit_len = np.max(a).bit_length()
    n = len(a)
    cached_arange = arange(n)
    idx = np.empty(n, dtype=int) # fully overwritten each iteration
    for mask_b in xrange(bit_len):
        is_one = (a & 2**mask_b).astype(bool)
        n_ones = np.sum(is_one)      
        n_zeros = n-n_ones
        idx[~is_one] = cached_arange[:n_zeros]
        idx[is_one] = cached_arange[:n_ones] + n_zeros
        # next three lines just do: a[idx] = a, but correctly
        new_a = np.empty(n, dtype=a.dtype)
        new_a[idx] = a
        a = new_a
    return a

编辑 3:与其循环单个位,如果您在多个步骤中构建 idx,则可以一次循环两个或更多位。使用 2 位有所帮助,我尚未尝试更多:

idx[is_zero] = np.arange(n_zeros)
idx[is_one] = np.arange(n_ones)
idx[is_two] = np.arange(n_twos)
idx[is_three] = np.arange(n_threes)

第四次和第五次修改:对于我正在测试的输入来说,将位数设置为4位似乎是最好的选择。此外,您可以完全摆脱idx步骤。现在,与np.sort相比,速度只慢了约5倍,而不是10倍(源代码可在gist中获取):

enter image description here

编辑 6: 这是上面内容的整理版,但速度稍微有点慢。80% 的时间都花在了 repeatextract 上 - 如果有一种方式可以广播 extract 就好了 :( ...

def radix_sort(a, batch_m_bits=3):
    bit_len = np.max(a).bit_length()
    batch_m = 2**batch_m_bits
    mask = 2**batch_m_bits - 1
    val_set = np.arange(batch_m, dtype=a.dtype)[:, nax] # nax = np.newaxis
    for _ in range((bit_len-1)//batch_m_bits + 1): # ceil-division
        a = np.extract((a & mask)[nax, :] == val_set,
                        np.repeat(a[nax, :], batch_m, axis=0))
        val_set <<= batch_m_bits
        mask <<= batch_m_bits
    return a
编辑7和8:实际上,您可以使用numpy.lib.stride_tricks中的as_strided广播提取,但在性能方面似乎没有太大帮助:

enter image description here

一开始我觉得这样做是有道理的,因为extract将会迭代整个数组batch_m次,所以CPU请求的缓存行总数与之前相同(只是在整个过程结束时,每个缓存行都被请求了batch_m次)。然而,实际情况extract无法聪明地迭代任意步长的数组,必须在开始之前扩展数组,即重复操作仍然会发生。事实上,经过查看extract的源代码,我现在看到我们可以采取的最佳方法是:

a = a[np.flatnonzero((a & mask)[nax, :] == val_set) % len(a)]

这比extract慢一些。但是,如果len(a)是2的幂,我们可以用& (len(a) - 1)替换昂贵的mod操作,这会比extract版本快一些(现在关于a=randint(0, 1e8, 2**20)大约是np.sort的4.9倍)。我想我们可以通过零填充来使非2次幂长度工作,然后在排序结束时裁剪多余的零...但是,除非长度已经接近2的幂,否则这将是一个悲观的选择。

1
考虑到你是在询问如何优化有效代码,建议你将问题发表在 CodeReview.SE,那里更适合讨论该类问题。 - ali_m
4
这个问题已经收到的答案是很好的例子,说明这个问题更适合在CR上发布;你得到的是建议而不是答案。 - DSM
1
我不太敢推荐 CodeReview,因为在 Stackoverflow 上有更多的 numpy 专家。但是对于编码风格和纯 Python 问题,CR 是不错的选择。 - hpaulj
4
@hpaulj 你知道,发展社区的最佳方式不是回避它。 - Mathieu Guindon
1
scipy.sparse.coo.coo_tocsr 在 C 语言的基数排序中做了你需要的确切循环... 在适当的条件下,可能比 np.sort 更快。 - user2379410
显示剩余14条评论
3个回答

4

我尝试使用Numba来测试基数排序的速度。在使用Numba时,取得良好性能的关键(通常)是写出所有循环,这非常有教育意义。最终我得到了以下代码:

from numba import jit

@jit
def radix_loop(nbatches, batch_m_bits, bitsums, a, out):
    mask = (1 << batch_m_bits) - 1
    for shift in range(0, nbatches*batch_m_bits, batch_m_bits):
        # set bit sums to zero
        for i in range(bitsums.shape[0]):
            bitsums[i] = 0

        # determine bit sums
        for i in range(a.shape[0]):
            j = (a[i] & mask) >> shift
            bitsums[j] += 1

        # take the cumsum of the bit sums
        cumsum = 0
        for i in range(bitsums.shape[0]):
            temp = bitsums[i]
            bitsums[i] = cumsum
            cumsum += temp

        # sorting loop
        for i in range(a.shape[0]):
            j = (a[i] & mask) >> shift
            out[bitsums[j]] = a[i]
            bitsums[j] += 1

        # prepare next iteration
        mask <<= batch_m_bits
        # cant use `temp` here because of numba internal types
        temp2 = a
        a = out
        out = temp2

    return a

从这4个内部循环来看,很容易发现第4个循环使得使用Numpy进行向量化变得困难。

绕过这个问题的一种方法是从Scipy中引入一个特定的C++函数:scipy.sparse.coo.coo_tocsr。它执行与上述Python函数几乎相同的内部循环,因此可以被滥用来编写更快的Python“向量化”基数排序。也许可以尝试以下代码:

from scipy.sparse.coo import coo_tocsr

def radix_step(radix, keys, bitsums, a, w):
    coo_tocsr(radix, 1, a.size, keys, a, a, bitsums, w, w)
    return w, a

def scipysparse_radix_perbyte(a):
    # coo_tocsr internally works with system int and upcasts
    # anything else. We need to copy anyway to not mess with
    # original array. Also take into account endianness...
    a = a.astype('<i', copy=True)
    bitlen = int(a.max()).bit_length()
    radix = 256
    work = np.empty_like(a)
    _ = np.empty(radix+1, int)
    for i in range((bitlen-1)//8 + 1):
        keys = a.view('u1')[i::a.itemsize].astype(int)
        a, work = radix_step(radix, keys, _, a, work)
    return a

编辑:稍微优化了一下函数,详见编辑历史。

LSB基数排序的一个低效之处在于数组需要在内存中完全洗牌多次,这意味着CPU缓存没有得到很好地利用。为了尝试减轻这种影响,可以选择先使用MSB基数排序进行一遍传递,将项目大致放入正确的RAM块中,然后使用LSB基数排序对每个结果组进行排序。以下是其中一种实现:

def scipysparse_radix_hybrid(a, bbits=8, gbits=8):
    """
    Parameters
    ----------
    a : Array of non-negative integers to be sorted.
    bbits : Number of bits in radix for LSB sorting.
    gbits : Number of bits in radix for MSB grouping.
    """
    a = a.copy()
    bitlen = int(a.max()).bit_length()
    work = np.empty_like(a)

    # Group values by single iteration of MSB radix sort:
    # Casting to np.int_ to get rid of python BigInt
    ngroups = np.int_(2**gbits)
    group_offset = np.empty(ngroups + 1, int)
    shift = max(bitlen-gbits, 0)
    a, work = radix_step(ngroups, a>>shift, group_offset, a, work)
    bitlen = shift
    if not bitlen:
        return a

    # LSB radix sort each group:
    agroups = np.split(a, group_offset[1:-1])
    # Mask off high bits to not undo the grouping..
    gmask = (1 << shift) - 1
    nbatch = (bitlen-1) // bbits + 1
    radix = np.int_(2**bbits)
    _ = np.empty(radix + 1, int)
    for agi in agroups:
        if not agi.size:
            continue
        mask = (radix - 1) & gmask
        wgi = work[:agi.size]
        for shift in range(0, nbatch*bbits, bbits):
            keys = (agi & mask) >> shift
            agi, wgi = radix_step(radix, keys, _, agi, wgi)
            mask = (mask << bbits) & gmask
        if nbatch % 2:
            # Copy result back in to `a`
            wgi[...] = agi
    return a

时间(在我的系统上使用最佳性能设置):

def numba_radix(a, batch_m_bits=8):
    a = a.copy()
    bit_len = int(a.max()).bit_length()
    nbatches = (bit_len-1)//batch_m_bits +1
    work = np.zeros_like(a)
    bitsums = np.zeros(2**batch_m_bits + 1, int)
    srtd = radix_loop(nbatches, batch_m_bits, bitsums, a, work)
    return srtd

a = np.random.randint(0, 1e8, 1e6)
%timeit numba_radix(a, 9)
# 10 loops, best of 3: 76.1 ms per loop
%timeit np.sort(a)
#10 loops, best of 3: 115 ms per loop
%timeit scipysparse_radix_perbyte(a)
#10 loops, best of 3: 95.2 ms per loop
%timeit scipysparse_radix_hybrid(a, 11, 6)
#10 loops, best of 3: 75.4 ms per loop

Numba表现得非常好,正如预期的那样。并且通过巧妙地应用现有的C扩展,可以击败numpy.sort。在您已经获得的优化水平上,我认为考虑添加Numpy插件是值得的,但我不会真的认为我的答案中的实现是“向量化”的:大部分工作是在外部专用函数中完成的。
另一个引起我注意的事情是对基数选择的敏感性。对于我尝试的大多数设置,我的实现仍然比numpy.sort慢,因此在实践中需要某种启发式方法来提供全面的良好性能。

我曾经短暂地想过在scipy.sparse中是否有相关的内容,但我自己并不太熟悉它...无论如何,你发现得很好!此外,我认为你肯定是对的,以MSB为基础进行缓存定位是正确的(我一直沉迷于LSB,没有探索其他选择)。也许我会尝试一下。我还不想将其标记为“正确”答案,因为numba解决方案不算,虽然scipy解决方案比我的好,但仍有可能做得更好...而且还有实现np.sort的目标。 - dan-man
@dan-man - 谢谢。将MSB排序加入其中确实有所帮助,但从代码量的角度来看,它的收益递减lol。我认为这也不是“正确”的答案,但至少我在上面工作时过得很愉快 :) - user2379410
我很有兴趣比较两者之间的最坏情况(或“糟糕情况”)表现。 - AndyG

0
我实际上使用Cython创建了一个基数排序。经过我的测试,它比Rust或C/C++中的基数排序实现快约5%。而且它比np.sort快了相当大的一部分(我记得大约快了10%)。 这是链接:https://github.com/Ohmagar/Radix_cython/blob/main/parallel_radix_5.pyx 我做了一些巧妙的事情来减少处理时间,通过按照数字的位数预先对元素进行排序,确保只有在需要排序的位数时才将数字排序到桶中。因此,一个“10”只需要处理两次,而不是8次(如果9_999_999 < max_element < 10_000_000)。 我从头开始用Python构建了一个概念验证,并逐渐对其进行了更多的改进。一旦我无法再获得更多的速度,我就将其原样重写为Cython,并开始进行一些微调。最后一步是对每个“digit_chunk”进行并行处理,这最终使我的实现比任何可比较的方法,尤其是numpy.sort,更快。
我刚刚发现在处理器函数中,通过让预排序也并行进行,可能可以让它更快一些。不知道我怎么会忽略了这一点。
随时欢迎你去查看。

很棒你提出了自己的替代方案。不过那不是问题的关键。请简单回答问题。目标是提供一个软件开发人员可以依赖的解决方案。你的替代方案可以写在评论栏中。 - undefined

0
你能把这个改成每次处理8位的计数/基数排序吗?对于32位无符号整数,创建一个大小为[4][257]的矩阵,用于统计字节字段出现的次数,在要排序的数组上进行一次读取。matrix[][0] = 0,matrix[][1] = 0出现的次数,...。然后将计数转换为索引,其中matrix[][0] = 0,matrix[][1] = 字节数等于0的数量,matrix[][2] = 字节数等于0 + 字节数等于1的数量,...。最后一个计数没有使用,因为那会索引到数组的末尾。然后进行4次基数排序,来回移动数据,原始数组和输出数组之间。每次处理16位需要一个大小为[2][65537]的矩阵,但只需要2次排序。示例C代码:
size_t mIndex[4][257] = {0};            /* index matrix */
size_t i, j, m;
uint32_t u;
uint32_t *pData;                        /* ptr to original array */
uint32_t *pTemp;                        /* ptr to working array */
uint32_t *pSrc;                         /* working ptr */
uint32_t *pDst;                         /* working ptr */
/* n is size of array */
    for(i = 0; i < n; i++){             /* generate histograms */
        u = pData[i];
        for(j = 0; j < 4; j++){
            mIndex[j][1 + (size_t)(u & 0xff)]++; /* note [1 + ... */
            u >>= 8;
        }       
    }
    for(j = 0; j < 4; j++){             /* convert to indices */
        for(i = 1; i < 257; i++){       /* (last count never used) */
            mIndex[j][i] += mIndex[j][i-1]
        }       
    }
    pDst = pTemp;                       /* radix sort */
    pSrc = pData;
    for(j = 0; j < 4; j++){
        for(i = 0; i < count; i++){     /* sort pass */
            u = pSrc[i];
            m = (size_t)(u >> (j<<3)) & 0xff;
        /*  pDst[mIndex[j][m]++] = u;      split into 2 lines */
            pDst[mIndex[j][m]] = u;
            mIndex[j][m]++;
        }
        pTmp = pSrc;                    /* swap ptrs */
        pSrc = pDst;
        pDst = pTmp;
    }

我仍然不知道如何进行向量化处理...尝试使用numpy/matlab编写最后一个循环,而不是使用C语言。 - dan-man
1
@dan-man - 我不确定这个能否向量化,但是想知道传统的基数排序迭代方法是否比你现在使用的更快。 - rcgldr

网页内容由stack overflow 提供, 点击上面的
可以查看英文原文,
原文链接