多个数组中相同元素的快速查找算法

7

我正在寻找加快(或替换)数据分组算法的方法。

我有一个 numpy 数组列表。我想生成一个新的 numpy 数组,使得对于原始数组中相同的每个索引处,此数组的每个元素都是相同的。(对于不是这种情况的地方则不同)

这听起来比较别扭,因此举个例子:

# Test values:
values = [
    np.array([10, 11, 10, 11, 10, 11, 10]),
    np.array([21, 21, 22, 22, 21, 22, 23]),
    ]

# Expected outcome: np.array([0, 1, 2, 3, 0, 3, 4])
#                             *           *

请注意,期望结果中我标记的元素(索引为0和4)具有相同的值(0),因为原始的两个数组也相同(即1021)。索引为3和5的元素 (3) 同理。
算法需要处理任意数量的(大小相等的)输入数组,并且还要返回每个结果数字对应的原始数组的值。 (因此,对于这个例子,“3”指的是(11, 22)。)
以下是我的当前算法:
import numpy as np

def groupify(values):
    group = np.zeros((len(values[0]),), dtype=np.int64) - 1 # Magic number: -1 means ungrouped.
    group_meanings = {}
    next_hash = 0
    matching = np.ones((len(values[0]),), dtype=bool)
    while any(group == -1):
        this_combo = {}

        matching[:] = (group == -1)
        first_ungrouped_idx = np.where(matching)[0][0]

        for curr_id, value_array in enumerate(values):
            needed_value = value_array[first_ungrouped_idx]
            matching[matching] = value_array[matching] == needed_value
            this_combo[curr_id] = needed_value
        # Assign all of the found elements to a new group
        group[matching] = next_hash
        group_meanings[next_hash] = this_combo
        next_hash += 1
    return group, group_meanings

请注意,表达式value_array [matching] == needed_value对于每个单独的索引都会被评估多次,这就是速度慢的原因所在。
我不确定我的算法是否可以进一步加速,但我也不确定它是否是最优算法。有更好的方法吗?
4个回答

3

最终我成功地找到了一种向量化的解决方案!这是一个有趣的问题。问题是我们必须标记从列表的相应数组元素中取出的每对值。然后,我们应该根据它们在其他对中的唯一性来标记每个这样的对。因此,我们可以滥用所有可选参数并最终做一些额外的工作来保持最终输出的顺序,使用np.unique。以下是基本实现分为三个阶段 -

# Stack as a 2D array with each pair from values as a column each.
# Convert to linear index equivalent considering each column as indexing tuple
arr = np.vstack(values)
idx = np.ravel_multi_index(arr,arr.max(1)+1)

# Do the heavy work with np.unique to give us :
#   1. Starting indices of unique elems, 
#   2. Srray that has unique IDs for each element in idx, and 
#   3. Group ID counts
_,unq_start_idx,unqID,count = np.unique(idx,return_index=True, \
                                        return_inverse=True,return_counts=True)

# Best part happens here : Use mask to ignore the repeated elems and re-tag 
# each unqID using argsort() of masked elements from idx
mask = ~np.in1d(unqID,np.where(count>1)[0])
mask[unq_start_idx] = 1
out = idx[mask].argsort()[unqID]

运行时测试

让我们将提议的向量化方法与原始代码进行比较。由于提议的代码仅获取组ID,因此为了公平的基准测试,让我们削减原始代码中未使用的部分。因此,这里是函数定义-

def groupify(values):  # Original code
    group = np.zeros((len(values[0]),), dtype=np.int64) - 1
    next_hash = 0
    matching = np.ones((len(values[0]),), dtype=bool)
    while any(group == -1):

        matching[:] = (group == -1)
        first_ungrouped_idx = np.where(matching)[0][0]

        for curr_id, value_array in enumerate(values):
            needed_value = value_array[first_ungrouped_idx]
            matching[matching] = value_array[matching] == needed_value
        # Assign all of the found elements to a new group
        group[matching] = next_hash
        next_hash += 1
    return group

def groupify_vectorized(values):  # Proposed code
    arr = np.vstack(values)
    idx = np.ravel_multi_index(arr,arr.max(1)+1)
    _,unq_start_idx,unqID,count = np.unique(idx,return_index=True, \
                                        return_inverse=True,return_counts=True)    
    mask = ~np.in1d(unqID,np.where(count>1)[0])
    mask[unq_start_idx] = 1
    return idx[mask].argsort()[unqID]

在具有大型数组的列表上运行时结果 -
In [345]: # Input list with random elements
     ...: values = [item for item in np.random.randint(10,40,(10,10000))]

In [346]: np.allclose(groupify(values),groupify_vectorized(values))
Out[346]: True

In [347]: %timeit groupify(values)
1 loops, best of 3: 4.02 s per loop

In [348]: %timeit groupify_vectorized(values)
100 loops, best of 3: 3.74 ms per loop

你能否再解释一下这里实际发生了什么?特别是:np.ravel_multi_index是做什么的?(文档对我来说不是很清楚。)为什么要在arr.max(1)+1上调用它? - acdr
@acdr 你可以使用_,values_elems = np.unique(input_string,return_inverse=True)来为我们自己创建数字标签,这些标签将对应于values中的元素。在此过程中,您可能需要使用.ravel()和重塑操作。 - Divakar
1
谢谢!vstacking [np.unique(v,return_inverse=True)[1] for v in values] 而不是 values 的确解决了“其他数据类型”的问题。我接受了这个答案(尽管我也喜欢 Eelco 的答案),因为它不需要单独的包,并且 timeit 的结果也很好。 - acdr
请注意,numpy-indexed 还包含许多更高级的功能,这些功能利用了您在此处计算的组标签类型(例如,按组拆分一组项目或对每个组执行缩减)。您的问题是否有某种更大的背景? - Eelco Hoogendoorn
是的,但通常我事先不知道。我向用户公开一个API,让他们为每个组“计算某些内容”,但这种计算方式差异很大。 - acdr
显示剩余2条评论

2

这应该可以正常工作,并且应该更快,因为我们使用了广播和numpy固有的快速布尔比较:

import numpy as np

# Test values:
values = [
    np.array([10, 11, 10, 11, 10, 11, 10]),
    np.array([21, 21, 22, 22, 21, 22, 23]),
    ]
# Expected outcome: np.array([0, 1, 2, 3, 0, 3, 4])

# for every value in values, check where duplicate values occur
same_mask = [val[:,np.newaxis] == val[np.newaxis,:] for val in values]

# get the conjunction of all those tests
conjunction = np.logical_and.reduce(same_mask)

# ignore the diagonal
conjunction[np.diag_indices_from(conjunction)] = False

# initialize the labelled array with nans (used as flag)
labelled = np.empty(values[0].shape)
labelled.fill(np.nan)

# keep track of labelled value
val = 0
for k, row in enumerate(conjunction):
    if np.isnan(labelled[k]):  # this element has not been labelled yet
        labelled[k] = val      # so label it
        labelled[row] = val    # and label every element satisfying the test
        val += 1

print(labelled)
# outputs [ 0.  1.  2.  3.  0.  3.  4.]

在处理这两个数组时,它比您的版本快了约1.5倍,但我认为对于更多的数组来说,速度提升应该会更好。


这是一个聪明的解决方案,我很喜欢,但对于大型数组来说,这对我来说是个问题。 (如果我的每个输入数组的长度为N,则val [:,np.newaxis]部分将创建一个N乘以N的数组。 如果我有一百万个元素,并且每个布尔值使用一个字节(我相信在numpy中是这样的),那么我最终需要一兆字节的内存才能存储这个数组。 :( - acdr
啊,是的,你没有提到任何关于内存问题的事情 :) 速度和内存效率之间总是存在权衡。你的版本更加节约内存,我的应该更快(特别是在处理大数据时)。 (顺便说一句,注意val [:,np.newaxis]并不会导致内存分配(只是一个广播操作),而是随后的==比较强制广播数组实际扩展。不是要挑剔,但也许这指向了进一步可能的优化。) - EelkeSpaak
实际上,如果内存是一个问题,我怀疑你当前的算法已经接近最优了。也许你可以尝试使用np.unique来做些什么?它支持一个return_inverse选项,可能会有用。 - EelkeSpaak

1

numpy_indexed 包含了numpy数组集合操作的一般化变体(免责声明:我是它的作者),可以以优雅和高效(向量化)的方式解决您的问题:

import numpy_indexed as npi
unique_values, labels = npi.unique(tuple(values), return_inverse=True)

上述方法适用于任意类型的组合,但是如果values是许多相同dtype数组的列表,则以下方法将更加高效:
unique_values, labels = npi.unique(np.asarray(values), axis=1, return_inverse=True)

-1

如果我理解正确,您正在尝试根据列哈希值。最好将列本身转换为任意值,然后从中找到哈希值。

因此,您实际上想在list(np.array(values).T)上进行哈希。

这个功能已经内置于Pandas中。您不需要编写它。唯一的问题是它需要一个没有进一步嵌套列表的值列表。在这种情况下,您可以将内部列表转换为string map(str, list(np.array(values).T))并对其进行因式分解!

>>> import pandas as pd
>>> pd.factorize(map(str, list(np.array(values).T)))
(array([0, 1, 2, 3, 0, 3, 4]),
 array(['[10 21]', '[11 21]', '[10 22]', '[11 22]', '[10 23]'], dtype=object))

我已经将您的数组列表转换为一个数组,然后再转换为字符串...

据我所知,pandas.factorize 只适用于一维数组。您将每个“行”转换为字符串,但对于大型数组来说,这将非常缓慢。 - acdr
这是正确的。然而,我不确定在原生Python中是否有任何实现能够击败Pandas。也许我们可以使用Cython来加速原生Python代码。这一切都取决于实际数组列表的大小... - ssm
取决于唯一组合的数量与每个数组的大小相比如何。对于具有少量组合的非常大的数组,我的解决方案将胜过你的解决方案。(只需将我的输入数组复制一百万次。您将调用一百万次 str。) - acdr

网页内容由stack overflow 提供, 点击上面的
可以查看英文原文,
原文链接