如何加速 numpy.unique 并提供计数和重复行索引

6

我试图在一个numpy数组中查找重复的行。以下代码复制了我的数组结构,该数组具有 n 行、m 列和每行 nz 个非零条目:

import numpy as np
import random
import datetime


def create_mat(n, m, nz):
    sample_mat = np.zeros((n, m), dtype='uint8')
    random.seed(42)
    for row in range(0, n):
        counter = 0
        while counter < nz:
            random_col = random.randrange(0, m-1, 1)
            if sample_mat[row, random_col] == 0:
                sample_mat[row, random_col] = 1
                counter += 1
    test = np.all(np.sum(sample_mat, axis=1) == nz)
    print(f'All rows have {nz} elements: {test}')
    return sample_mat

我尝试优化的代码如下:
if __name__ == '__main__':
    threshold = 2
    mat = create_mat(1800000, 108, 8)

    print(f'Time: {datetime.datetime.now()}')
    unique_rows, _, duplicate_counts = np.unique(mat, axis=0, return_counts=True, return_index=True)
    duplicate_indices = [int(x) for x in np.argwhere(duplicate_counts >= threshold)]
    print(f'Time: {datetime.datetime.now()}')

    print(f'Unique rows: {len(unique_rows)} Sample inds: {duplicate_indices[0:5]} Sample counts: {duplicate_counts[0:5]}')
    print(f'Sample rows:')
    print(unique_rows[0:5])

我的输出如下:

All rows have 8 elements: True
Time: 2022-06-29 12:08:07.320834
Time: 2022-06-29 12:08:23.281633
Unique rows: 1799994 Sample inds: [508991, 553136, 930379, 1128637, 1290356] Sample counts: [1 1 1 1 1]
Sample rows:
[[0 0 0 0 0 0 0 0 0 0 0 0 0 0 0 0 0 0 0 0 0 0 0 0 0 0 0 0 0 0 0 0 0 0 0 0
  0 0 0 0 0 0 0 0 0 0 0 0 0 0 0 0 0 0 0 0 0 0 0 0 0 0 0 0 0 0 0 0 0 0 0 0
  0 0 0 0 0 0 0 0 0 0 0 1 0 0 0 0 1 1 0 0 1 1 0 0 0 1 0 0 0 0 0 0 1 0 1 0]
 [0 0 0 0 0 0 0 0 0 0 0 0 0 0 0 0 0 0 0 0 0 0 0 0 0 0 0 0 0 0 0 0 0 0 0 0
  0 0 0 0 0 0 0 0 0 0 0 0 0 0 0 0 0 0 0 0 0 0 0 0 0 0 0 0 0 0 0 0 0 0 0 0
  0 0 0 0 0 0 0 0 0 0 0 1 0 0 0 1 1 0 0 0 0 0 1 1 1 1 0 0 0 0 1 0 0 0 0 0]
 [0 0 0 0 0 0 0 0 0 0 0 0 0 0 0 0 0 0 0 0 0 0 0 0 0 0 0 0 0 0 0 0 0 0 0 0
  0 0 0 0 0 0 0 0 0 0 0 0 0 0 0 0 0 0 0 0 0 0 0 0 0 0 0 0 0 0 0 0 0 0 0 0
  0 0 0 0 0 0 0 0 0 0 0 1 0 0 1 0 0 0 0 1 0 1 0 0 1 0 0 0 1 0 1 0 1 0 0 0]
 [0 0 0 0 0 0 0 0 0 0 0 0 0 0 0 0 0 0 0 0 0 0 0 0 0 0 0 0 0 0 0 0 0 0 0 0
  0 0 0 0 0 0 0 0 0 0 0 0 0 0 0 0 0 0 0 0 0 0 0 0 0 0 0 0 0 0 0 0 0 0 0 0
  0 0 0 0 0 0 0 0 0 0 0 1 0 1 1 0 0 0 0 1 1 0 0 0 0 0 0 1 1 0 1 0 0 0 0 0]
 [0 0 0 0 0 0 0 0 0 0 0 0 0 0 0 0 0 0 0 0 0 0 0 0 0 0 0 0 0 0 0 0 0 0 0 0
  0 0 0 0 0 0 0 0 0 0 0 0 0 0 0 0 0 0 0 0 0 0 0 0 0 0 0 0 0 0 0 0 0 0 0 0
  0 0 0 0 0 0 0 0 0 0 1 1 0 0 0 0 1 0 0 0 1 0 1 1 0 0 0 0 0 1 0 0 0 0 1 0]]

我考虑使用numba,但挑战在于它不使用轴参数。同样,将其转换为列表并利用集合是一个选择,但是循环执行重复计数似乎“不符合Python风格”。

由于我需要多次运行此代码(因为我正在修改numpy数组,然后需要重新搜索重复项),所以时间很关键。我还尝试使用多进程处理该步骤,但np.unique似乎是阻塞的(即使我尝试运行多个版本,我仍然被限制在一个线程上,CPU利用率为6%,而其他线程处于空闲状态)。


1
unique_indices 包含非唯一索引,示例代码可写为 np.argwhere(unique_counts >= threshold).ravel() - Michael Szczesny
1
我无法重现多进程行为。使用多进程处理相同的数组会耗尽所有可用的核心np.unique单线程的。然而,加速比仅约为40%。 - Michael Szczesny
4
unique函数在给定axis参数时,会创建一个结构化数组视图(view),将每一行封装为一个记录(record)。然后对这个数组进行unique1d操作,其中排序是非常重要的。实际上,它会将“行”(作为不可变单元)进行排序,并删除相邻的重复项。 - hpaulj
1
使用@hpaulj的见解并通过位打包减少要排序的列数,可以加速np.unique。但是,这仅在列数是无符号整数类型的倍数时才有效。笔记本 - Michael Szczesny
1
@Ali_Sh - numba 的实现可以工作,但在这个例子中比 np.unique 慢了15%。 - Michael Szczesny
显示剩余5条评论
4个回答

6

第一步:位压缩

由于您的矩阵只包含二进制值,可以将位数uint64值中进行紧凑压缩,以实现更高效的排序。这是一个Numba实现:

import numpy as np
import numba as nb

@nb.njit('(uint8[:,::1],)', parallel=True)
def pack_bits(mat):
    n, m = mat.shape
    res = np.zeros((n, (m+63)//64), np.uint64)
    for i in nb.prange(n):
        for bj in range(0, m, 64):
            val = np.uint64(0)
            if bj + 64 <= m:
                # Fast case
                for j in range(64):
                    val += np.uint64(mat[i, bj+j]) << (63 - j)
            else:
                # Slow case (boundary)
                for j in range(m - bj):
                    val += np.uint64(mat[i, bj+j]) << (63 - j)
            res[i, bj//64] = val
    return res

@nb.njit('(uint64[:,::1], int_)', parallel=True)
def unpack_bits(mat, m):
    n = mat.shape[0]
    assert mat.shape[1] == (m+63)//64
    res = np.zeros((n, m), np.uint64)
    for i in nb.prange(n):
        for bj in range(0, m, 64):
            val = np.uint64(mat[i, bj//64])
            if bj + 64 <= m:
                # Fast case
                for j in range(64):
                    res[i, bj+j] = np.uint8((val >> (63 - j)) & 1)
            else:
                # Slow case (boundary)
                for j in range(m - bj):
                    res[i, bj+j] = np.uint8((val >> (63 - j)) & 1)
    return res

np.unique 函数可以在比初始代码中更小的打包数组上调用(除了生成的排序数组是打包的并需要解包)。由于您不需要索引,最好不要计算它。因此,return_index=True 可以被移除。此外,只需解压缩所需的值(解压缩比打包稍微昂贵一些,因为编写大矩阵比读取现有矩阵更昂贵)。

if __name__ == '__main__':
    threshold = 2
    n, m = 1800000, 108
    mat = create_mat(n, m, 8)

    print(f'Time: {datetime.datetime.now()}')
    packed_mat = pack_bits(mat)
    duplicate_packed_rows, duplicate_counts = np.unique(packed_mat, axis=0, return_counts=True)
    duplicate_indices = [int(x) for x in np.argwhere(duplicate_counts >= threshold)]
    print(f'Time: {datetime.datetime.now()}')

    print(f'Duplicate rows: {len(duplicate_rows)} Sample inds: {duplicate_indices[0:5]} Sample counts: {duplicate_counts[0:5]}')
    print(f'Sample rows:')
    print(unpack_bits(duplicate_packed_rows[0:5], m))

步骤二: np.unique 优化

np.unique方法在执行多个耗时的内部排序步骤时效率不高。但是在特定情况下,并非所有排序步骤都是必要的,因此可以进行一些优化。

更有效的实现方式是先对最后一列进行排序,然后对上一列进行排序,以此类推,直到对第一列进行排序,类似于基数排序。请注意,可以使用非稳定的算法对最后一列进行排序(通常更快),但其他列需要使用稳定的算法。该方法仍然不够优化,因为argsort调用速度较慢,而当前实现尚未使用多个线程。不幸的是,Numpy尚未提供任何有效的方法来对2D数组的行进行排序。虽然可以在Numba中重新实现它,但这很麻烦,有点棘手且容易出错。更不用说相比原生C/C++代码,Numba会引入一些额外的开销。排序完成后,可以跟踪和计算唯一/重复的行。以下是一种实现方式:

def sort_lines(mat):
    n, m = mat.shape

    for i in range(m):
        kind = 'stable' if i > 0 else None
        mat = mat[np.argsort(mat[:,m-1-i], kind=kind)]

    return mat

@nb.njit('(uint64[:,::1],)', parallel=True)
def find_duplicates(sorted_mat):
    n, m = sorted_mat.shape
    assert m >= 0

    isUnique = np.zeros(n, np.bool_)
    uniqueCount = 1
    if n > 0:
        isUnique[0] = True
    for i in nb.prange(1, n):
        isUniqueVal = False
        for j in range(m):
            isUniqueVal |= sorted_mat[i, j] != sorted_mat[i-1, j]
        isUnique[i] = isUniqueVal
        uniqueCount += isUniqueVal

    uniqueValues = np.empty((uniqueCount, m), np.uint64)
    duplicateCounts = np.zeros(len(uniqueValues), np.uint64)

    cursor = 0
    for i in range(n):
        cursor += isUnique[i]
        for j in range(m):
            uniqueValues[cursor-1, j] = sorted_mat[i, j]
        duplicateCounts[cursor-1] += 1

    return uniqueValues, duplicateCounts

可以通过使用find_duplicates(sort_lines(packed_mat))替换之前的np.unique调用。

步骤3:基于GPU的np.unique

在使用Numba和Numpy在CPU上实现快速行排序算法不容易时,可以简单地使用CuPy在GPU上进行操作(假设有可用的Nvidia GPU以及已安装CUDA和CuPy)。这种解决方案具有简单和显着更高的效率的优点。以下是一个例子:

import cupy as cp

def cupy_sort_lines(mat):
    cupy_mat = cp.array(mat)
    return cupy_mat[cp.lexsort(cupy_mat.T[::-1,:])].get()

之前的sort_lines调用可以替换为cupy_sort_lines


结果

这是在我的6核i5-9600KF CPU和Nvidia 1660 Super GPU上测试的时间:

Initial version:        15.541 s
Optimized packing:       0.982 s
Optimized np.unique:     0.634 s
GPU-based sorting:       0.143 s   (require a Nvidia GPU)

因此,优化后的基于CPU的版本大约快25倍,而基于GPU的版本则快109倍。请注意,排序在所有版本中都需要花费相当长的时间。还请注意,在基准测试中未包括解压操作(如提供的代码所示)。只有少数行进行解压缩而不是整个数组时,它仅需花费可忽略的时间(在我的机器上大约需要 ~200 毫秒)。这个最后的操作可以进一步优化,但实现将会显著更加复杂。

2

为了分享一个基础的天真解决方案:

版本 1:

def get_duplicate_indexes(data):
    signature_to_indexes = {}

    index = 0
    for row in data:
        key = row.data.tobytes()
        if key not in signature_to_indexes:
            signature_to_indexes[key] = [index]
        else:
            signature_to_indexes[key].append(index)
        index = index + 1

    return [
        indexes
        for indexes in signature_to_indexes.values()
        if len(indexes) > 1
    ]

In [122]: %timeit get_duplicate_indexes(mat)
833 ms ± 5.94 ms per loop (mean ± std. dev. of 7 runs, 1 loop each)

In [125]: %time get_duplicate_indexes(mat)
CPU times: user 1.5 s, sys: 87.1 ms, total: 1.59 s
Wall time: 1.59 s

In [123]: get_duplicate_indexes(mat)
Out[123]:
[[1396, 402980],
 [769782, 1421364],
 [875866, 1476693],
 [892483, 1194500],
 [1230863, 1478688],
 [1311189, 1426136]]

第二版(与@Kelly Bundy讨论后):

def get_duplicate_indexes(data):
    signature_to_indexes = {}
    duplicates = {}

    for index, row in enumerate(data):
        key = row.data.tobytes()
        if key not in signature_to_indexes:
            signature_to_indexes[key] = index
        else:
            indexes = signature_to_indexes[key]
            if isinstance(indexes, int):
                duplicates[key] = signature_to_indexes[key] = [indexes, index]
            else:
                indexes.append(index)
    return list(duplicates.values())

In [146]: %timeit get_duplicate_indexes(mat)
672 ms ± 3.67 ms per loop (mean ± std. dev. of 7 runs, 1 loop each)

In [147]: %time get_duplicate_indexes(mat)
CPU times: user 652 ms, sys: 46.6 ms, total: 699 ms
Wall time: 698 ms

提供信息:在我的机器上,初始版本大约需要14秒。


嗯,为什么%time比%timeit大那么多?哪一个给出了约14秒的结果? - Kelly Bundy
14秒是关于主题发起者的初始版本。关于时间与timeit:我相信这是另一个讨论的话题 :) 但我打赌timeit会禁用gc。 - westandskif
我理解这与OP有关。但是我需要将那些14秒与你的0.8秒或1.6秒进行比较吗? - Kelly Bundy
2
可能是垃圾回收(gc),是的。在这种情况下,gc活动很大程度上是由相关代码本身(由于许多列表)引起的,因此我认为它应该被包括在内。或者,我们可以先仅存储索引本身,并仅为实际重复项创建列表。这将可能进一步加快速度(不仅因为减少了垃圾回收活动)。 - Kelly Bundy
使用 enumerate 会更好。 - Kelly Bundy
@KellyBundy 你说得对 :) 我添加了第二个版本,现在它不再为每一行创建一个列表。 - westandskif

1
如果您不想使用位打包或在多维数组上调用np.unique,也可以创建视图。这比任何其他优化方式都要慢,但更具教育意义:
def create_view(arr): # a, b are arrays
    arr = np.ascontiguousarray(arr)
    void_dt = np.dtype((np.void, arr.dtype.itemsize * arr.shape[1]))
    return arr.view(void_dt).ravel()

def create_mat(n, m, nz):
    rng = np.random.default_rng()
    data = rng.permuted(np.full((n, m), [1] * nz + [0] * (m - nz), dtype=np.uint8), axis=1)
    return data

if __name__ == '__main__':
    mat = create_mat(1800000, 108, 8)
    mat_view = create_view(mat)
    u, idx, counts = np.unique(mat_view, axis=0, return_index=True, return_counts=True)
    duplicate_indices = np.flatnonzero(counts >= 2)

    print(f'Duplicate inds: {duplicate_indices}')
    print(f'Duplicate counts: {counts[duplicate_indices]}')
    print(f'Unique rows: {len(u)}')

请注意,我还修复了一些代码。可以更高效地完成create_mat。运行时间约为3秒钟 :)

0
我建议使用scipy.stats.rankdata,并将您喜欢的轴设置为axis。请注意,如果将method设置为min,则结果数组中的唯一值将给出唯一行的索引。

网页内容由stack overflow 提供, 点击上面的
可以查看英文原文,
原文链接