从2D numpy数组中快速删除特定行值的方法

6

我有一个如下的二维数组:

a = np.array([[25, 83, 18, 71],
       [75,  7,  0, 85],
       [25, 83, 18, 71],
       [25, 83, 18, 71],
       [75, 48,  8, 43],
       [ 7, 47, 96, 94],
       [ 7, 47, 96, 94],
       [56, 75, 50,  0],
       [19, 49, 92, 57],
       [52, 93, 58,  9]])

我想要删除某些特定值的行,例如:

b = np.array([[56, 75, 50,  0], [52, 93, 58,  9], [25, 83, 18, 71]])

这在numpypandas中最有效的方法是什么? 期望输出:
np.array([[75,  7,  0, 85],
       [75, 48,  8, 43],
       [ 7, 47, 96, 94],
       [ 7, 47, 96, 94],
       [19, 49, 92, 57]])

更新

最快的方法是降维,但通常需要对列的范围有相当严格的限制。这里是我的 perfplot:

import pandas as pd
import numexpr as ne
import perfplot
from time import time

def remove_pd(data):
    a,b = data
    dfa, dfb = pd.DataFrame(a), pd.DataFrame(b)
    return dfa.merge(dfb, how='left', indicator=True)\
    .query('_merge == "left_only"').drop(columns='_merge').values
    
def remove_smalldata(data):
    a,b = data
    return a[(a[None,:,:] != b[:,None,:]).any(-1).all(0)]

'''def remove_nploop(data):
    a, b = data
    for arr in b:
        a = a[np.all(~np.equal(a, arr), axis=1)]
    return a'''
        
def remove_looped(data): 
    a, b = data
    to_remain = [True]*len(a)
    ind = 0
    for vec_a in a:
        for vec_b in b:
            if np.array_equal(vec_a, vec_b):
                to_remain[ind] = False
                break
        ind += 1
    return a[to_remain]

def remove_looped_boost(data): 
    a, b = data
    to_remain = [True]*len(a)
    a_map = list(map(tuple, a.tolist()))
    b_map = set(map(tuple, b.tolist()))
    for i in range(len(a)):
        to_remain[i] = not(a_map[i] in b_map)
    return a[to_remain]

def remove_reducedim(data):
    a,b = data
    a, b = a.astype(np.int64), b.astype(np.int64) #make sure box is not too small
    ma, MA = np.min(a, axis=0), np.max(a, axis=0)
    mb, MB = np.min(b, axis=0), np.max(b, axis=0)
    m, M = np.min([ma, mb], axis=0), np.max([MA, MB],axis=0)
    ravel_a = np.ravel_multi_index((a-m).T, M - m + 1)
    ravel_b = np.ravel_multi_index((b-m).T, M - m + 1)
    return a[~np.isin(ravel_a, ravel_b)]

def remove_reducedim_boost(data):
    a,b = data
    a, b = a.astype(np.int64), b.astype(np.int64) #make sure box is not too small
    ma, MA = np.min(a, axis=0), np.max(a, axis=0)
    mb, MB = np.min(b, axis=0), np.max(b, axis=0)
    m1,m2,m3,m4 = np.min([ma, mb], axis=0)
    M1,M2,M3,M4 = np.max([MA, MB], axis=0)
    s1,s2,s3,s4 = M1-m1+1, M2-m2+1, M3-m3+1, M4-m4+1
    a1,a2,a3,a4 = a.T
    b1,b2,b3,b4 = b.T
    d = {'a1':a1, 'a2':a2, 'a3':a3, 'a4':a4, 'b1':b1, 'b2':b2, 'b3':b3, 'b4':b4,
        's1':s1, 's2':s2, 's3':s3, 'm1':m1, 'm2':m2, 'm3':m3, 'm4':m4}
    ravel_a = ne.evaluate('(a1-m1)+(a2-m2)*s1+(a3-m3)*s1*s2+(a4-m4)*s1*s2*s3',d)
    ravel_b = ne.evaluate('(b1-m1)+(b2-m2)*s1+(b3-m3)*s1*s2+(b4-m4)*s1*s2*s3',d)
    return a[~np.isin(ravel_a, ravel_b)]
    
def setup(x):
    a1 = np.random.randint(50000, size=(x,4))
    a2 = a1[np.random.randint(x, size=x)]
    return a1, a2
    
def build_args(figure):
    kernels = [remove_reducedim, remove_reducedim_boost, remove_pd, remove_looped, remove_looped_boost, remove_smalldata]
    return {'setup': setup,
    'kernels': {'A': kernels, 'B': kernels[:3]}[figure],
    'n_range': {'A': [2 ** k for k in range(12)], 'B': [2 ** k for k in range(11, 25)]}[figure],
     'xlabel': 'Remowing n rows from n rows',
     'title' : {'A':'Testing removal of small dataset', 'B':'Testing removal of large dataset'}[figure],
     'show_progress': False,
     'equality_check': lambda x,y: np.array_equal(x, y)}
    
t = time()
outs = [perfplot.bench(**build_args(n)) for n in ('A','B')]
fig = plt.figure(figsize=(20, 20))
for i in range(len(outs)):
    ax = fig.add_subplot(2, 1, i+1)
    ax.grid(True, which="both")
    outs[i].plot()
plt.show()
print('Overall testing time:', time()-t)

输出:

Overall testing time: 529.2596168518066

enter image description here

enter image description here


如果a中有多行与b中的某一行相同,那么它们都应该被删除吗? - GZ0
1
@GZ0。是的,我的示例中有说明。 - mathfux
7个回答

7

这里有一种使用 pandas 的方法,通过使用 mergequery 实现"反向连接(anti-join)"。

dfa = pd.DataFrame(a)
dfb = pd.DataFrame(b)

df = (
    dfa.merge(dfb, how='left', indicator=True)
    .query('_merge == "left_only"')
    .drop(columns='_merge')
)

    0   1   2   3
1  75   7   0  85
4  75  48   8  43
5   7  47  96  94
6   7  47  96  94
8  19  49  92  57

注意:使用普通的numpy解决方案可能更快,但这个方法也可以胜任。
使用单循环的普通numpy
for arr in b:
    a = a[np.all(~np.equal(a, arr), axis=1)]

array([[75,  7,  0, 85],
       [75, 48,  8, 43],
       [ 7, 47, 96, 94],
       [ 7, 47, 96, 94],
       [19, 49, 92, 57]])

谢谢。最好有多个版本,包括为了perfplot或其他方式计时。 - mathfux
所以,如果不应用降维技术,pandas 是所有解决方案中表现最好的。如果整数较大,它可能是最快的解决方案。 - mathfux

3

方法 #1: 视图 + searchsorted

使用数组视图的一种方法是将每行视为标量 -

# https://dev59.com/tqPia4cB1Zd3GeqP3cEw#45313353/ @Divakar
def view1D(a, b): # a, b are arrays
    a = np.ascontiguousarray(a)
    b = np.ascontiguousarray(b)
    void_dt = np.dtype((np.void, a.dtype.itemsize * a.shape[1]))
    return a.view(void_dt).ravel(),  b.view(void_dt).ravel()

A,B = view1D(a,b)

sidx = B.argsort()
idx = np.searchsorted(B, A, sorter=sidx)
idx[idx==len(B)] = 0
out = a[B[sidx[idx]] != A]

一种变体是对 B 进行排序 -
Bs = np.sort(B)
idx = np.searchsorted(Bs, A)
idx[idx==len(B)] = 0
out = a[Bs[idx] != A]

另一种变体是在给定B的情况下在A上使用searchsorted函数 -

sidx = A.argsort()
As = A[sidx]
idx = np.searchsorted(A, B, sorter=sidx)

id_ar = np.zeros(len(a), dtype=int)
id_accum = np.cumsum(np.r_[False, As[:-1] != As[1:]])
count = np.bincount(id_accum)
idx_end = idx + count[id_accum[idx]]

id_ar[idx] = 1
id_ar[idx_end] -= 1
out = a[sidx[id_ar.cumsum()==0]]

对于最后一步,如果您需要保持顺序,请使用:

a[np.sort(sidx[id_ar.cumsum()==0])]

方法二:KD树

利用SciPy库的cKDTree实现-

from scipy.spatial import cKDTree

dist,idx = cKDTree(b).query(a,k=1)
out = a[dist!=0]

方法三:使用数组赋值进行掩码操作

对于较小范围的数字,我们可以采用布尔型数组进行赋值:

s = np.maximum(a.max(0)-np.minimum(0,a.min(0)),b.max(0)-np.minimum(0,b.min(0)))+1
mask = np.empty(s, dtype=bool)
mask[tuple(a.T)] = 1
mask[tuple(b.T)] = 0
out = a[mask[tuple(a.T)]]

请注意,这将在四列输入数据集上初始化一个形状为(n,n,n,n)的数组,以覆盖值范围n方法 #4:使用排序
# https://dev59.com/_lcO5IYBdhLWcg3w-V70#44999009/ @Divakar
def view1D_onevar(a): # a is array
    a = np.ascontiguousarray(a)
    void_dt = np.dtype((np.void, a.dtype.itemsize * a.shape[1]))
    return a.view(void_dt).ravel()

def app4(a,b):
    ba = np.vstack((b,a))
    sidx = np.lexsort(ba.T)
    basi = ba[sidx]
    v = view1D_onevar(basi)
    idar = np.r_[False,v[:-1] != v[1:]].cumsum()
    mapar = np.ones(idar[-1]+1, dtype=bool)
    mapar[idar[sidx<len(b)]] = 0
    out = basi[mapar[idar]]
    return out

另外还有两个变体:

def app4_v2(a,b):
    ba = np.vstack((b,a))
    v0 = view1D_onevar(ba)
    sidx = v0.argsort()
    v = v0[sidx]
    idar = np.r_[False,v[:-1] != v[1:]].cumsum()
    mapar = np.ones(idar[-1]+1, dtype=bool)
    mapar[idar[sidx<len(b)]] = 0
    out = a[sidx[mapar[idar]]-len(b)]
    return out

def app4_v3(a,b):
    ba = np.vstack((b,a))
    sidx = view1D_onevar(ba).argsort()
    basi = ba[sidx]
    v = view1D_onevar(basi)
    idar = np.r_[False,v[:-1] != v[1:]].cumsum()
    mapar = np.ones(idar[-1]+1, dtype=bool)
    mapar[idar[sidx<len(b)]] = 0
    out = basi[mapar[idar]]
    return out

请注意,顺序不被保留。 lexsort/argsort 是瓶颈,如果我们愿意进行降维处理,可以对其进行优化。

2
div_app3() 似乎是罪魁祸首。 - Darkonaut
看来我需要在 div_app3() 上退一步,它无法处理超过5列的情况。 - Darkonaut
@Darkonaut,只要你能按照那里指定的方式分配[n]*a.shape[1]布尔形数组,就应该没问题。 - Divakar
由于s试图“分配形状为(83, 93, 75, 75, 88, 100)和数据类型为bool”的数组”,因此即使对于a的(2,6)形状,它也会出现MemoryError错误。因此,为了使其能够处理更多列,您在数组中的数字必须保持很小。 - Darkonaut

2
我能想到的唯一方法是基于降维技术:
def remove(a, b):
    a, b = a.astype(np.int64), b.astype(np.int64) #make sure box is not too small
    ma, MA = np.min(a, axis=0), np.max(a, axis=0)
    mb, MB = np.min(b, axis=0), np.max(b, axis=0)
    m, M = np.min([ma, mb], axis=0), np.max([MA, MB],axis=0)
    ravel_a = np.ravel_multi_index((a-m).T, M - m + 1)
    ravel_b = np.ravel_multi_index((b-m).T, M - m + 1)
    return a[~np.isin(ravel_a, ravel_b)]

由于我们需要做许多基本的代数运算,在这里可以通过使用numexpr来实现一些性能提升:

import numexpr as ne
def remove_boost(a,b):
    a, b = a.astype(np.int64), b.astype(np.int64) #make sure box is not too small
    ma, MA = np.min(a, axis=0), np.max(a, axis=0)
    mb, MB = np.min(b, axis=0), np.max(b, axis=0)
    m1,m2,m3,m4 = np.min([ma, mb], axis=0)
    M1,M2,M3,M4 = np.max([MA, MB], axis=0)
    s1,s2,s3,s4 = M1-m1+1, M2-m2+1, M3-m3+1, M4-m4+1
    a1,a2,a3,a4 = a.T
    b1,b2,b3,b4 = b.T
    d = {'a1':a1, 'a2':a2, 'a3':a3, 'a4':a4, 'b1':b1, 'b2':b2, 'b3':b3, 'b4':b4,
        's1':s1, 's2':s2, 's3':s3, 'm1':m1, 'm2':m2, 'm3':m3, 'm4':m4}
    ravel_a = ne.evaluate('(a1-m1)+(a2-m2)*s1+(a3-m3)*s1*s2+(a4-m4)*s1*s2*s3',d)
    ravel_b = ne.evaluate('(b1-m1)+(b2-m2)*s1+(b3-m3)*s1*s2+(b4-m4)*s1*s2*s3',d)
    return a[~np.isin(ravel_a, ravel_b)]

出乎意料的是,在处理大型数据时,降维是唯一适用于 numpy 的方法来进行多维度的删除 :)

remove(a, b)remove_boost(a, b)输出结果相同:

[[75  7  0 85]
 [75 48  8 43]
 [ 7 47 96 94]
 [ 7 47 96 94]
 [19 49 92 57]]

缺点:它只能处理尺寸不大于2^63的盒子(s1*s2*s3*s4 = np.prod(np.ptp(np.r_[a,b], axis=0)+1)应小于2^63)。


2
如果数据不太大的话,广播是另一个选项:
a[(a[None,:,:] != b[:,None,:]).any(-1).all(0)]

输出:

array([[75,  7,  0, 85],
       [75, 48,  8, 43],
       [ 7, 47, 96, 94],
       [ 7, 47, 96, 94],
       [19, 49, 92, 57]])

看起来在小数据上,一个简单的Python就能胜过它 :( - mathfux

1
这是另一种实现降维的方法。
scales = np.array([1,10,100,1000])
a_scaled = np.sum(a * scales,axis=1)[:,None]
b_scaled = np.sum(b * scales,axis=1)[None,:]
a_slice = a[~np.any(a_scaled == b_scaled,axis=1)]

正如您所看到的,我们将不同比例的不同列的数字相乘,并基本上将此问题转化为比较两个1d数组。

如果列的顺序不重要,您还需要添加 np.sort

scales = np.array([1,10,100,1000])
a_scaled = np.sum(np.sort(a,axis=1) * scales,axis=1)[:,None]
b_scaled = np.sum(np.sort(b,axis=1) * scales,axis=1)[None,:]
a_slice = a[~np.any(a_scaled == b_scaled,axis=1)]

我想这种方法可能存在失败的情况,所以我很乐意听取反馈意见。


1

基准测试文章

我们将遵循原帖作者的策略,将小到适中大小的数据集作为第一阶段进行基准测试,然后在其中选择几个较大的数据集作为第二阶段。此外,我们将保持使用原帖作者基准测试代码中的4列。

  • 第一阶段:数据集大小最多为5000,b的长度为a长度的5%-90%
  • 第二阶段:去掉最后两个数据集,然后再次运行,数据集大小最多为1000000,并且b的百分比与之前相同。

使用benchit包(打包了几个基准测试工具;免责声明:我是其作者)来基准测试所提出的解决方案。

所提出的解决方案:

import numpy as np
import pandas as pd
import numexpr as ne

def remove_pd(a,b):
    dfa, dfb = pd.DataFrame(a), pd.DataFrame(b)
    return dfa.merge(dfb, how='left', indicator=True)\
    .query('_merge == "left_only"').drop(columns='_merge').values
    
def remove_smalldata(a,b):
    return a[(a[None,:,:] != b[:,None,:]).any(-1).all(0)]
        
def remove_looped(a, b): 
    to_remain = [True]*len(a)
    ind = 0
    for vec_a in a:
        for vec_b in b:
            if np.array_equal(vec_a, vec_b):
                to_remain[ind] = False
                break
        ind += 1
    return a[to_remain]

def remove_looped_boost(a, b): 
    to_remain = [True]*len(a)
    a_map = list(map(tuple, a.tolist()))
    b_map = set(map(tuple, b.tolist()))
    for i in range(len(a)):
        to_remain[i] = not(a_map[i] in b_map)
    return a[to_remain]

def remove_reducedim(a, b):
    a, b = a.astype(np.int64), b.astype(np.int64) #make sure box is not too small
    ma, MA = np.min(a, axis=0), np.max(a, axis=0)
    mb, MB = np.min(b, axis=0), np.max(b, axis=0)
    m, M = np.min([ma, mb], axis=0), np.max([MA, MB],axis=0)
    ravel_a = np.ravel_multi_index((a-m).T, M - m + 1)
    ravel_b = np.ravel_multi_index((b-m).T, M - m + 1)
    return a[~np.isin(ravel_a, ravel_b)]

def remove_reducedim_boost(a,b):
    a, b = a.astype(np.int64), b.astype(np.int64) #make sure box is not too small
    ma, MA = np.min(a, axis=0), np.max(a, axis=0)
    mb, MB = np.min(b, axis=0), np.max(b, axis=0)
    m1,m2,m3,m4 = np.min([ma, mb], axis=0)
    M1,M2,M3,M4 = np.max([MA, MB], axis=0)
    s1,s2,s3,_ = M1-m1+1, M2-m2+1, M3-m3+1, M4-m4+1
    a1,a2,a3,a4 = a.T
    b1,b2,b3,b4 = b.T
    d = {'a1':a1, 'a2':a2, 'a3':a3, 'a4':a4, 'b1':b1, 'b2':b2, 'b3':b3, 'b4':b4,
        's1':s1, 's2':s2, 's3':s3, 'm1':m1, 'm2':m2, 'm3':m3, 'm4':m4}
    ravel_a = ne.evaluate('(a1-m1)+(a2-m2)*s1+(a3-m3)*s1*s2+(a4-m4)*s1*s2*s3',d)
    ravel_b = ne.evaluate('(b1-m1)+(b2-m2)*s1+(b3-m3)*s1*s2+(b4-m4)*s1*s2*s3',d)
    return a[~np.isin(ravel_a, ravel_b)]

def view1D(a, b): # a, b are arrays
    a = np.ascontiguousarray(a)
    b = np.ascontiguousarray(b)
    void_dt = np.dtype((np.void, a.dtype.itemsize * a.shape[1]))
    return a.view(void_dt).ravel(),  b.view(void_dt).ravel()

def div_app1(a, b):
    A,B = view1D(a,b)
    sidx = B.argsort()
    idx = np.searchsorted(B, A, sorter=sidx)
    idx[idx==len(B)] = 0
    return a[B[sidx[idx]] != A]

def div_app1_v2(a, b):
    A,B = view1D(a,b)
    Bs = np.sort(B)
    idx = np.searchsorted(Bs, A)
    idx[idx==len(B)] = 0
    return a[Bs[idx] != A]

from scipy.spatial import cKDTree

def div_app2(a, b):
    dist,idx = cKDTree(b).query(a,k=1)
    return a[dist!=0]

def div_app3(a, b):
    s = np.maximum(a.max(0)-np.minimum(0,a.min(0)), b.max(0)-np.minimum(0,b.min(0)))+1
    mask = np.empty(s, dtype=bool)
    mask[tuple(a.T)] = 1
    mask[tuple(b.T)] = 0
    return a[mask[tuple(a.T)]]

def Darkonaut(a, b):
    a_rows = a.view([('', a.dtype)] * a.shape[1])  # '' default fieldname
    b_rows = b.view([('', b.dtype)] * b.shape[1])        
    return np.setdiff1d(a_rows, b_rows, assume_unique=True)

代码基准测试:
def setup(array_len, b_len_percentage):
    a = np.random.randint(0,100,(array_len,4))
    b_len = int(len(a)*b_len_percentage/100.)
    b = np.unique(a[np.random.choice(len(a), b_len, replace=False)], axis=0)
    return a,b

import benchit
benchit.setparams(rep=1) # use rep=2 or bigger if you have time & patience

funcs = [remove_pd, remove_smalldata, remove_looped, remove_looped_boost, 
         remove_reducedim, remove_reducedim_boost, div_app1, div_app1_v2,
         div_app2, div_app3, app4, app4_v2, app4_v3, Darkonaut]
percentages = np.r_[5,list(range(10,100,20))]
in_ = {(n,x):setup(n,x) for n in [100, 500, 1000, 2000, 5000] for x in percentages}
t = benchit.timings(funcs, in_, multivar=True, input_name=['Len_a', 'Len_b_as_Percentage'])
t.plot(logx=True, sp_ncols=2, legend_fontsize=8, rot=90, save='timings1.png',dpi=200)

funcs.remove(remove_smalldata)
funcs.remove(remove_looped)
in_ = {(n,x):setup(n,x) for n in np.linspace(10**4,10**6,5,dtype=int) for x in percentages}
t2 = benchit.timings(funcs, in_, multivar=True, input_name=['Len_a', 'Len_b_as_Percentage'])
t2.plot(logx=True, sp_ncols=2, legend_fontsize=8, rot=90, save='timings2.png',dpi=200)

enter image description here

enter image description here


1
你可以尝试使用循环比较两个矩阵中的向量来解决问题。
代码如下:
import np

a = np.array([[25, 83, 18, 71],
       [75,  7,  0, 85],
       [25, 83, 18, 71],
       [25, 83, 18, 71],
       [75, 48,  8, 43],
       [ 7, 47, 96, 94],
       [ 7, 47, 96, 94],
       [56, 75, 50,  0],
       [19, 49, 92, 57],
       [52, 93, 58,  9]])

b = np.array([[56, 75, 50,  0], [52, 93, 58,  9], [25, 83, 18, 71]])

to_remain = [True]*len(a)

ind = 0

for vec_a in a:
  for vec_b in b:
    if np.array_equal(vec_a, vec_b):
      to_remain[ind] = False
      break
  
  ind += 1

output = a[to_remain]

print(output)

输出

[[75  7  0 85]
 [75 48  8 43]
 [ 7 47 96 94]
 [ 7 47 96 94]
 [19 49 92 57]]

1
这不是高效的,因为它使用了循环。我期望有一些向量化的解决方案。 - mathfux
什么是“向量化”解决方案? - tclarke13
@tclarke13 的向量化解决方案是一种结合了 numpy 操作的方式,它可以替换所有循环或理解操作以在 C 级别上运行(因此速度更快)。您可以在互联网上搜索,因为这个定义可能不是最理想的。 - mathfux
1
@QuantStats 第二个循环对性能来说真的很痛苦。但是你可以使用元组在一组元组中进行搜索,这是即时的。在这种情况下,它比 Quang 的解决方案表现更好(看看我的例子)。 - mathfux
@mathfux 感谢您的评论和花费时间编译所有建议输入的性能时间。这绝对是一次有益的学习。 - QuantStats

网页内容由stack overflow 提供, 点击上面的
可以查看英文原文,
原文链接