为每个错误值向右有效地扩展n个单元的numpy掩码。

10

假设我有一个长度为30的数组,其中有4个坏值。我想为这些坏值创建一个掩码(mask),但由于我将使用滚动窗口函数,我还希望在每个坏值之后标记一定数量的相邻索引。在下面的示例中,n = 3:

enter image description here

我希望尽可能高效地完成这个任务,因为这个例程将在包含数十亿数据点的大型数据系列上运行多次。因此,我需要尽可能接近numpy向量化解决方案,因为我想避免使用Python循环。
为了避免重新输入,这里是数组:
import numpy as np
a = np.array([4, 0, 8, 5, 10, 9, np.nan, 1, 4, 9, 9, np.nan, np.nan, 9,\
              9, 8, 0, 3, 7, 9, 2, 6, 7, 2, 9, 4, 1, 1, np.nan, 10])

他显然认真考虑过,因为他画了图形并附带了一小段代码和示例数组。 - Joran Beasley
1
@ Evert:是的,我可以循环遍历每个错误索引值并将[1,2,3]添加到每个值中,获取所有这些值的集合(即唯一值),并将其用作掩码错误值索引。但这需要一个循环。如果我有100k个坏值和1m个数据点,你可以看到我的循环需要很长时间,而且这是在机器学习上下文中,所以会有很多重复的例程。很高兴听到更多关于花式索引的可能性。我猜我可以使用pandas滚动应用程序,但那仍然是一个循环(虽然由C实现可能很快)。有没有纯向量的方法? - Thomas Browne
(据我所知,即使“纯向量”也是在C语言中通过循环实现的:P) - Joran Beasley
@Joran Beasley。实际上,如果您购买了链接了MKL的Numpy版本(Anaconda Accelerate),许多矢量化内容将传递到Intel AVX指令集。在我的i7上,与Ubuntu和OS/X中的非MKL相比,可以将速度提高4-5倍。但是是的...我明白你的意思;-) - Thomas Browne
1
使用不规则间隔的数据,你无法使其“纯粹向量化”(即矩形)。最好的方法是使用编译循环,例如@askewchan的“reduceat”。 - hpaulj
显示剩余4条评论
7个回答

7

又一个答案!
它只需要使用您已经拥有的掩码,并对其进行逻辑或(logical or)操作,然后将其应用于移位版本。这种方法矢量化得很好,速度非常快!:D

def repeat_or(a, n=4):
    m = np.isnan(a)
    k = m.copy()

    # lenM and lenK say for each mask how many
    # subsequent Trues there are at least
    lenM, lenK = 1, 1

    # we run until a combination of both masks will give us n or more
    # subsequent Trues
    while lenM+lenK < n:
        # append what we have in k to the end of what we have in m
        m[lenM:] |= k[:-lenM]

        # swap so that m is again the small one
        m, k = k, m

        # update the lengths
        lenM, lenK = lenK, lenM+lenK

    # see how much m has to be shifted in order to append the missing Trues
    k[n-lenM:] |= m[:-n+lenM]

    return k

很遗憾,我无法使m[i:] |= m[:-i]运行...可能在同时修改和使用掩码自身上不是个好主意。但是m[:-i] |= m[i:]确实可行,只是方向错误了。
无论如何,现在我们拥有类似于斐波那契增长的增长速度,仍然比线性增长更好。
(我从未想过会真正编写与斐波那契数列相关的算法,而不只是一些奇怪的数学问题。)

在具有1e6大小和1e5 NANs的数组下进行"真实"条件测试:

In [5]: a = np.random.random(size=1e6)

In [6]: a[np.random.choice(np.arange(len(a), dtype=int), 1e5, replace=False)] = np.nan

In [7]: %timeit reduceat(a)
10 loops, best of 3: 65.2 ms per loop

In [8]: %timeit index_expansion(a)
100 loops, best of 3: 12 ms per loop

In [9]: %timeit cumsum_trick(a)
10 loops, best of 3: 17 ms per loop

In [10]: %timeit repeat_or(a)
1000 loops, best of 3: 1.9 ms per loop

In [11]: %timeit agml_indexing(a)
100 loops, best of 3: 6.91 ms per loop

我会把后续的基准测试留给托马斯。

1
你已经参加了比赛。跟不上了。明天公布测试结果! - Thomas Browne
我认为你想要在第2行写成 m = np.isnan(a)。 - Thomas Browne
1
当然!:D 这就是当你不使用复制粘贴,只是从记忆中写的时候会发生的事情... - swenzel
这在概念上与@AGML的第二个答案相同,但避免了昂贵的布尔索引。 - askewchan
如果你认为移位掩码是相同的概念,那么是的。但是在我看来,我们的实现方式有很大不同。:) 另外,AGML 不使用布尔索引而是直接索引。他们需要在每次迭代中都增加索引,然后使用它们来索引掩码并将相应的值设置为 True。我会在几分钟内添加一些优化,以便它能更好地适应 n 的规模,这在 AGML 的当前解决方案中是不可能的。 - swenzel
显示剩余2条评论

5

这里是基准测试结果。我包含了自己的代码("op"),它首先循环遍历坏索引并将1...n添加到它们中,然后取唯一值以查找掩码索引。您可以在下面的代码中看到它,以及其他所有响应。

无论如何,以下是结果。第一维是数组大小沿x(从10到10e7),第二维是窗口大小沿y(5、50、500、5000)。然后是每个方格中的编码器,带有对数10得分,因为我们正在讨论微秒到分钟。

enter image description here

@swenzel似乎是获胜者,他的第二个答案取代了@moarningsun的第一个答案(moarningsun的第二个答案通过大量内存使用使机器崩溃,但这可能是因为它没有设计用于大的n或非稀疏a)。

图表不能完全反映这些贡献中最快的,因为(必要的)对数比例尺。它们比甚至是不错的循环解决方案快几十倍或几百倍。在最大的情况下,swenzel1比op快1000倍,而op已经利用了numpy。

请注意,我使用的是针对优化的英特尔MKL库编译的numpy版本,该库充分利用了自2012年以来存在的AVX指令。在某些矢量使用情况下,这将使i7/Xeon速度提高5倍。某些贡献可能受益更多。

这是运行到目前为止提交的所有答案(包括我的)的完整代码。函数allagree()确保结果是正确的,而timeall()将为您提供一个长格式的pandas数据帧,其中包含所有结果(以秒为单位)。

您可以相当容易地重新运行它以使用新代码或更改我的假设。请记住,我没有考虑其他因素,如内存使用。另外,我采用了R ggplot2作为图形,因为我不太熟悉seaborn/matplotlib,无法让它按我的意愿工作。

为了完整起见,所有结果都是一致的:

In [4]: allagree(n = 7, asize = 777)
Out[4]:
             AGML0 AGML1 askewchan0 askewchan1 askewchan2 moarningsun0  \
AGML0         True  True       True       True       True         True
AGML1         True  True       True       True       True         True
askewchan0    True  True       True       True       True         True
askewchan1    True  True       True       True       True         True
askewchan2    True  True       True       True       True         True
moarningsun0  True  True       True       True       True         True
swenzel0      True  True       True       True       True         True
swenzel1      True  True       True       True       True         True
op            True  True       True       True       True         True

             swenzel0 swenzel1    op
AGML0            True     True  True
AGML1            True     True  True
askewchan0       True     True  True
askewchan1       True     True  True
askewchan2       True     True  True
moarningsun0     True     True  True
swenzel0         True     True  True
swenzel1         True     True  True
op               True     True  True

感谢所有提交的人!

在使用R中的pd.to_csv和read.csv导出timeall()输出后,用于制作图形的代码:

ww <- read.csv("ww.csv")    
ggplot(ww, aes(x=coder, y=value, col = coder)) + geom_point(size = 3) + scale_y_continuous(trans="log10")+ facet_grid(nsize ~ asize) + theme(axis.text.x = element_text(angle = 90, hjust = 1)) + ggtitle("Fastest by coder") + ylab("time (seconds)")

测试用代码:

# test Stack Overflow 32706135 nan shift routines

import numpy as np
import pandas as pd
import seaborn as sns
import matplotlib.pyplot as plt
from timeit import Timer
from scipy import ndimage
from skimage import morphology
import itertools
import pdb
np.random.seed(8472)


def AGML0(a, n):                               # loop itertools
    maskleft = np.where(np.isnan(a))[0]
    maskright = maskleft + n
    mask = np.zeros(len(a),dtype=bool)
    for l,r in itertools.izip(maskleft,maskright): 
        mask[l:r] = True
    return mask


def AGML1(a, n):                               # loop n
    nn = n - 1
    maskleft = np.where(np.isnan(a))[0]
    ghost_mask = np.zeros(len(a)+nn,dtype=bool)
    for i in range(0, nn+1):
        thismask = maskleft + i
        ghost_mask[thismask] = True
    mask = ghost_mask[:len(ghost_mask)-nn]
    return mask


def askewchan0(a, n):
    m = np.isnan(a)
    i = np.arange(1, len(m)+1)
    ind = np.column_stack([i-n, i]) # may be a faster way to generate this
    ind.clip(0, len(m)-1, out=ind)
    return np.bitwise_or.reduceat(m, ind.ravel())[::2]


def askewchan1(a, n):
    m = np.isnan(a)
    s = np.full(n, True, bool)
    return ndimage.binary_dilation(m, structure=s, origin=-(n//2))


def askewchan2(a, n):
    m = np.isnan(a)
    s = np.zeros(2*n - n%2, bool)
    s[-n:] = True
    return morphology.binary_dilation(m, selem=s)


def moarningsun0(a, n):
    mask = np.isnan(a)
    cs = np.cumsum(mask)
    cs[n:] -= cs[:-n].copy()
    return cs > 0


def moarningsun1(a, n):
    mask = np.isnan(a)
    idx = np.flatnonzero(mask)
    expanded_idx = idx[:,None] + np.arange(1, n)
    np.put(mask, expanded_idx, True, 'clip')
    return mask


def swenzel0(a, n):
    m = np.isnan(a)
    k = m.copy()
    for i in range(1, n):
        k[i:] |= m[:-i]
    return k


def swenzel1(a, n=4):
    m = np.isnan(a)
    k = m.copy()

    # lenM and lenK say for each mask how many
    # subsequent Trues there are at least
    lenM, lenK = 1, 1

    # we run until a combination of both masks will give us n or more
    # subsequent Trues
    while lenM+lenK < n:
        # append what we have in k to the end of what we have in m
        m[lenM:] |= k[:-lenM]

        # swap so that m is again the small one
        m, k = k, m

        # update the lengths
        lenM, lenK = lenK, lenM+lenK

    # see how much m has to be shifted in order to append the missing Trues
    k[n-lenM:] |= m[:-n+lenM]
    return k


def op(a, n):
    m = np.isnan(a)
    for x in range(1, n):
        m = np.logical_or(m, np.r_[False, m][:-1])
    return m


# all the functions in a list. NB these are the actual functions, not their names
funcs = [AGML0, AGML1, askewchan0, askewchan1, askewchan2, moarningsun0, swenzel0, swenzel1, op]

def allagree(fns = funcs, n = 10, asize = 100):
    """ make sure result is the same from all functions """
    fnames = [f.__name__ for f in fns]
    a = np.random.rand(asize)
    a[np.random.randint(0, asize, int(asize / 10))] = np.nan
    results = dict([(f.__name__, f(a, n)) for f in fns])
    isgood = [[np.array_equal(results[f1], results[f2]) for f1 in fnames] for f2 in fnames]
    pdgood = pd.DataFrame(isgood, columns = fnames, index = fnames)
    if not all([all(x) for x in isgood]):
        print "not all results identical"
        pdb.set_trace()
    return pdgood


def timeone(f):
    """ time one of the functions across the full range of a nd n """
    print "Timing", f.__name__
    Ns = np.array([10**x for x in range(0, 4)]) * 5 # 5 to 5000 window size
    As = [np.random.rand(10 ** x) for x in range(1, 8)] # up to 10 million data data points
    for i in range(len(As)): # 10% of points are always bad
        As[i][np.random.randint(0, len(As[i]), len(As[i]) / 10)] = np.nan
    results = np.array([[Timer(lambda: f(a, n)).timeit(number = 1) if n < len(a) \
                        else np.nan for n in Ns] for a in As])
    pdresults = pd.DataFrame(results, index = [len(x) for x in As], columns = Ns)
    return pdresults


def timeall(fns = funcs):
    """ run timeone for all known funcs """
    testd = dict([(x.__name__, timeone(x)) for x in fns])
    testdf = pd.concat(testd.values(), axis = 0, keys = testd.keys())
    testdf.index.names = ["coder", "asize"]
    testdf.columns.names = ["nsize"]
    testdf.reset_index(inplace = True)
    testdf = pd.melt(testdf, id_vars = ["coder", "asize"])
    return testdf

有点跑题,但是你是怎么制作那张图片的? - AGML
@AGML:R ggplot 2 库。代码已包含在上面。只需将 timeall() 的结果导出为 csv,作为变量(此处称为 ww)带入 R 中,它应该可以不变地工作。尝试了半天在 Seaborn 中复制,最终回到了图形仍然统治的 R。 - Thomas Browne
2
从问题中的图片我期望看到一个漂亮的基准概述...你做到了 :) - user2379410

4

这也可以被视为一种形态学膨胀问题,使用 morphological dilation,在此使用 scipy.ndimage.binary_dilation

def dilation(a, n):
    m = np.isnan(a)
    s = np.full(n, True, bool)
    return ndimage.binary_dilation(m, structure=s, origin=-(n//2))

关于origin的说明:此参数确保structure(我会称之为一个核心)从input(您的掩码m)的左侧开始一点。通常,out[i]处的值将是在in[i]处具有structure中心(即structure[n//2])的膨胀值,但您希望structure[0]位于in[i]处。
您还可以使用在左侧填充了False的核来完成此操作,这是如果您使用scikit-imagebinary_dilation时所需的方式。
def dilation_skimage(a, n):
    m = np.isnan(a)
    s = np.zeros(2*n - n%2, bool)
    s[-n:] = True
    return skimage.morphology.binary_dilation(m, selem=s)

时间似乎在这两者之间并没有太大的变化。
dilation_scipy
small:    10 loops, best of 3: 47.9 ms per loop
large: 10000 loops, best of 3: 88.9 µs per loop

dilation_skimage
small:    10 loops, best of 3: 47.0 ms per loop
large: 10000 loops, best of 3: 91.1 µs per loop

谁说Python科学生态系统不是绝对棒的,也没有吸引到最优秀的人才呢?Askewchan,你就是大师。我已经发现你的原始答案非常适合我的用例。我正在准备一些与循环版本相比较的基准测试,今晚会发布。 - Thomas Browne
哈哈,谢谢你的赞美,@Thomas。帮助你是我的荣幸。时间和因此最佳选择将取决于len(a)np.count_nonzero(m)n本身的相对大小。不要忽视@AGML的答案,它在某些情况下更快。 - askewchan
非常聪明!我很好奇要看一下时间,因为“对一些索引周围的窗口做些事情”似乎是一个相当常见的问题。 - AGML
你用来计时的数组是什么样子的? - swenzel

3

你是想要这样的效果吗?

maskleft = np.where(np.isnan(a))[0]
maskright = maskleft + n
mask = np.zeros(len(a),dtype=bool)
for l,r in itertools.izip(maskleft,maskright): 
   mask[l:r] = True

或者,因为n很小,最好循环处理:

maskleft = np.where(np.isnan(a))[0]
mask = np.zeros(len(a),dtype=bool)
for i in range(0,n):
  thismask = maskleft+i
  mask[thismask] = True

除了针对n的循环外,上述内容已完全向量化。但是该循环是完全可并行化的,因此,如果你愿意努力,可以使用例如multiprocessing或Cython来获得n倍加速效果。
编辑:根据@askewchan的解决方案2,可能会导致超出范围错误。它还在range(0,n)中存在索引问题。可能需要进行以下更正:
maskleft = np.where(np.isnan(a))[0]
ghost_mask = np.zeros(len(a)+n,dtype=bool)
for i in range(0, n+1):
    thismask = maskleft + i
    ghost_mask[thismask] = True
mask = ghost_mask[:len(ghost_mask)-n]

不错,但我们正在循环。我希望Numpy、Scipy或Pandas的某些组合有一种聪明的向量化形式来完成这个任务。也许我太挑剔了。不幸的是,我已经在一个多进程队列中同时使用所有处理器来处理多个数据系列。实际上,我希望有一种漂亮的代数方式,能够以尽可能快的速度运行它,并利用AVX寄存器的优势。我意识到我在这里要求得非常苛刻......你的循环可能已经足够快了,我会试一试。 - Thomas Browne
1
我认为完全避免循环是不可能的,但是只要 n 的规模较小,您就可以通过将循环并行化来获得相同的性能。 - AGML
如果这仍然太慢,那么你可能要涉足Cython领域了,不幸的是。如果需要的话,这里有一个类似的问题可以作为起点:https://www.reddit.com/r/learnpython/comments/2xqlwj/using_npwhere_to_find_subarrays/ - AGML
你的第一个解决方案在小范围内比我能想出的任何东西都要快。你的第二个答案没有正确处理n接近结尾的True值(使索引maskleft + i超出范围)。 - askewchan
请查看以下AGML的结果...随意更改代码。 - Thomas Browne

3
您可以使用 np.ufunc.reduceat 函数,以及 np.bitwise_or 函数一起使用:
import numpy as np
a = np.array([4, 0, 8, 5, 10, 9, np.nan, 1, 4, 9, 9, np.nan, np.nan, 9,
              9, 8, 0, 3, 7, 9, 2, 6, 7, 2, 9, 4, 1, 1, np.nan, 10])
m = np.isnan(a)
n = 4
i = np.arange(1, len(m)+1)
ind = np.column_stack([i-n, i]) # may be a faster way to generate this
ind.clip(0, len(m)-1, out=ind)

np.bitwise_or.reduceat(m, ind.ravel())[::2]

关于您的数据:

print np.column_stack([m, reduced])
[[False False]
 [False False]
 [False False]
 [False False]
 [False False]
 [False False]
 [ True  True]
 [False  True]
 [False  True]
 [False  True]
 [False False]
 [ True  True]
 [ True  True]
 [False  True]
 [False  True]
 [False  True]
 [False False]
 [False False]
 [False False]
 [False False]
 [False False]
 [False False]
 [False False]
 [False False]
 [False False]
 [False False]
 [False False]
 [False False]
 [ True  True]
 [False  True]]

3
你可以使用与运行平均滤波器相同的累计求和技巧:
def cumsum_trick(a, n):
    mask = np.isnan(a)
    cs = np.cumsum(mask)
    cs[n:] -= cs[:-n].copy()
    return cs > 0

不幸的是,由于操作顺序的问题,需要额外的.copy()。虽然可以说服numpy反向应用减法,但为了使其工作,cs数组必须具有负步幅:

def cumsum_trick_nocopy(a, n):
    mask = np.isnan(a)
    cs = np.cumsum(mask, out=np.empty_like(a, int)[::-1])
    cs[n:] -= cs[:-n]
    out = cs > 0
    return out

但这种方法似乎很脆弱,而且速度也没有太大提升。

我想知道是否有一个编译好的信号处理函数可以执行这个精确操作...


对于稀疏的初始掩码和较小的n,这个方法也相当快:

def index_expansion(a, n):
    mask = np.isnan(a)
    idx = np.flatnonzero(mask)
    expanded_idx = idx[:,None] + np.arange(1, n)
    np.put(mask, expanded_idx, True, 'clip')
    return mask

需要复制的原因是读取和写入cs元素的顺序。我将就地ufunc应用程序与花式索引混淆了,后者确实使用了某种缓冲。 - user2379410

2
几年后,我想出了一个完全向量化的解决方案,不需要循环或复制(除了掩码本身)。这个解决方案有点(潜在地)危险,因为它使用了 numpy.lib.stride_tricks.as_strided。它也不如 @swentzel's solution 快。
思路是取掩码并创建一个二维视图,其中第二维仅是跟随当前元素的元素。然后,如果头部为 True,就可以将整个列设置为 True。由于你正在处理一个视图,因此设置一列实际上会设置掩码中的以下元素。
从数据开始:
import numpy as np
a = np.array([4, 0, 8, 5, 10, 9, np.nan, 1, 4, 9, 9, np.nan, np.nan, 9,\
              9, 8, 0, 3, 7, 9, 2, 6, 7, 2, 9, 4, 1, 1, np.nan, 10])
n = 3

现在,我们将使掩码的长度为 a.size + n,这样您就不必手动处理最后的 n 个元素了:
mask = np.empty(a.size + n, dtype=np.bool)
np.isnan(a, out=mask[:a.size])
mask[a.size:] = False

现在是最酷的部分:
view = np.lib.stride_tricks.as_strided(mask, shape=(n + 1, a.size),
                                       strides=mask.strides * 2)

那最后一部分非常关键。`mask.strides` 是一个元组,如 `(1,)`(因为布尔值通常跨越这么多字节)。将其加倍意味着您需要在任何维度中移动一个元素时,需要进行 1 字节步长的移动。
现在您只需要扩展掩码即可:
view[1:, view[0]] = True

这就是了。现在 mask 里有你想要的东西。请记住,这只能工作是因为赋值索引在最后一个更改的值之前。你不能使用 view[1:] |= view[0]
为了测试目的,似乎问题中 n 的定义已经改变,因此以下函数将考虑到这一点:
def madphysicist0(a, n):
    m = np.empty(a.size + n - 1, dtype=np.bool)
    np.isnan(a, out=m[:a.size])
    m[a.size:] = False

    v = np.lib.stride_tricks.as_strided(m, shape=(n, a.size), strides=m.strides * 2)
    v[1:, v[0]] = True
    return v[0]

从已有的最快答案中借鉴,我们只需要复制log2(n)行,而不是n行:
def madphysicist1(a, n):
    m = np.empty(a.size + n - 1, dtype=np.bool)
    np.isnan(a, out=m[:a.size])
    m[a.size:] = False

    v = np.lib.stride_tricks.as_strided(m, shape=(n, a.size), strides=m.strides * 2)

    stop = int(np.log2(n))
    for k in range(1, stop + 1):
        v[k, v[0]] = True
    if (1<<k) < n:
        v[-1, v[(1<<k) - 1]] = True
    return v[0]

由于每次迭代都会使掩码大小加倍,因此它比斐波那契方法更快。参考链接:https://math.stackexchange.com/q/894743/295281

你的madphysicist1没有通过allagree()测试。我将仅测试第一个例程madphysicist0。你可能想要检查并编辑它。 - Thomas Browne
我已经对你进行了基准测试,你很快,但并不像swenzel1或moarningstar那样快。也许你的madphysicist1例程会更快?但正如我所说,它的输出不正确。一旦输出正确,我会再次进行基准测试。这里是一个包括你在内的结果图表链接:https://www.dropbox.com/s/rntk0wpieobwz9v/madphysicist0_included.png?dl=0 - Thomas Browne
你似乎在 FALSE 连续运行中多次添加了一个 FALSE,可以通过 madphysicist1(a, n) 进行修正。 - Thomas Browne
@ThomasBrowne。我会去看看的。 - Mad Physicist

网页内容由stack overflow 提供, 点击上面的
可以查看英文原文,
原文链接