在一个NumPy数组中找到相同值序列的长度(运行长度编码)

63
在一个pylab程序中(可能也可以是matlab程序),我有一个numpy数字数组表示距离:d[t] 是时间 t 的距离(我的数据的时间跨度为len(d)个时间单位)。
我感兴趣的事件是当距离低于某个阈值时,我想计算这些事件的持续时间。用 b = d<threshold 得到一个布尔数组很容易,问题就变成了计算b 中 True-only 单词的长度序列。但是我不知道如何高效地做到这一点(即使用numpy原语),我不得不遍历数组并进行手动更改检测(即当值从False变为True时初始化计数器,在值为True时增加计数器,并在值回到False时将计数器输出到序列)。但这是非常慢的。
如何有效地在numpy数组中检测这种序列呢?
以下是一些Python代码,说明了我的问题:第四个圆点需要很长时间才能出现(如果没有,请增加数组大小)。
from pylab import *

threshold = 7

print '.'
d = 10*rand(10000000)

print '.'

b = d<threshold

print '.'

durations=[]
for i in xrange(len(b)):
    if b[i] and (i==0 or not b[i-1]):
        counter=1
    if  i>0 and b[i-1] and b[i]:
        counter+=1
    if (b[i-1] and not b[i]) or i==len(b)-1:
        durations.append(counter)

print '.'
7个回答

77

针对任何数组完全实现了numpy矢量化和通用的RLE(也适用于字符串、布尔值等)。

输出运行长度、起始位置和值的元组。

import numpy as np

def rle(inarray):
        """ run length encoding. Partial credit to R rle function. 
            Multi datatype arrays catered for including non Numpy
            returns: tuple (runlengths, startpositions, values) """
        ia = np.asarray(inarray)                # force numpy
        n = len(ia)
        if n == 0: 
            return (None, None, None)
        else:
            y = ia[1:] != ia[:-1]               # pairwise unequal (string safe)
            i = np.append(np.where(y), n - 1)   # must include last element posi
            z = np.diff(np.append(-1, i))       # run lengths
            p = np.cumsum(np.append(0, z))[:-1] # positions
            return(z, p, ia[i])

相当快(i7):

xx = np.random.randint(0, 5, 1000000)
%timeit yy = rle(xx)
100 loops, best of 3: 18.6 ms per loop

多种数据类型:

rle([True, True, True, False, True, False, False])
Out[8]: 
(array([3, 1, 1, 2]),
 array([0, 3, 4, 5]),
 array([ True, False,  True, False], dtype=bool))

rle(np.array([5, 4, 4, 4, 4, 0, 0]))
Out[9]: (array([1, 4, 2]), array([0, 1, 5]), array([5, 4, 0]))

rle(["hello", "hello", "my", "friend", "okay", "okay", "bye"])
Out[10]: 
(array([2, 1, 1, 2, 1]),
 array([0, 2, 3, 4, 6]),
 array(['hello', 'my', 'friend', 'okay', 'bye'], 
       dtype='|S6'))

和上面的Alex Martelli得到的结果相同:

xx = np.random.randint(0, 2, 20)

xx
Out[60]: array([1, 1, 1, 1, 0, 0, 1, 1, 1, 1, 1, 0, 1, 1, 0, 1, 1, 1, 1, 1])

am = runs_of_ones_array(xx)

tb = rle(xx)

am
Out[63]: array([4, 5, 2, 5])

tb[0][tb[2] == 1]
Out[64]: array([4, 5, 2, 5])

%timeit runs_of_ones_array(xx)
10000 loops, best of 3: 28.5 µs per loop

%timeit rle(xx)
10000 loops, best of 3: 38.2 µs per loop

比 Alex 稍微慢一些(但仍然非常快),而且更加灵活。


警告 - 此函数在 np.nan 的运行中会出现错误。正在修复。 - Thomas Browne

60

虽然不是numpy原语,但itertools函数通常非常快速,因此请尝试使用此函数(当然,包括此函数的各种解决方案,请测量时间):

def runs_of_ones(bits):
  for bit, group in itertools.groupby(bits):
    if bit: yield sum(group)
如果你确实需要将值存储在列表中,你可以使用list(runs_of_ones(bits))来完成;当然,也可以使用列表解析,其速度可能会稍微更快:
def runs_of_ones_list(bits):
  return [sum(g) for b, g in itertools.groupby(bits) if b]

转向“numpy-native”可能性,考虑以下内容:

def runs_of_ones_array(bits):
  # make sure all runs of ones are well-bounded
  bounded = numpy.hstack(([0], bits, [0]))
  # get 1 at run starts and -1 at run ends
  difs = numpy.diff(bounded)
  run_starts, = numpy.where(difs > 0)
  run_ends, = numpy.where(difs < 0)
  return run_ends - run_starts

再次强调:一定要在适合你的实际例子中对解决方案进行基准测试!


非常感谢!“diff/where”解决方案恰好符合我的想法(更不用说它比其他解决方案快10倍)。如果您觉得这不是太聪明,请说我不聪明,但我希望自己聪明到能想出这个解决方案 :-) - Gyom
@gnovice,我不做matlab(有趣的是,我的女儿现在是高级无线电工程博士候选人,她会;-),但现在看到你的答案,我确实看到了类比--获取结束运行时间减去开始运行时间,通过定位差异中的<0和>0点来获取这些,并用零填充位以确保所有连续的1都结束。猜想没有太多方法可以解决这个“运行长度”问题!-) - Alex Martelli
@Gyom,不客气——就像@gnovice所暗示的那样,matlab解决方案也类似,或者我猜如果一个人知道matlab,那么它也会是类似的 - 所以两者都不太聪明;-)......更多的是关于以前必须运行长度编码的问题(我的大部分编辑时间都是在从Numeric翻译,这是我仍然本能地转向的东西,到了更好的numpy - 但是我最早学习这些技术的地方是30年前,当时我还是硬件设计师,用的是APL!)。 - Alex Martelli

13

这里有一个仅使用数组的解决方案:它接收一个包含布尔值序列的数组,并计算转换的长度。

>>> from numpy import array, arange
>>> b = array([0,0,0,1,1,1,0,0,0,1,1,1,1,0,0], dtype=bool)
>>> sw = (b[:-1] ^ b[1:]); print sw
[False False  True False False  True False False  True False False False
  True False]
>>> isw = arange(len(sw))[sw]; print isw
[ 2  5  8 12]
>>> lens = isw[1::2] - isw[::2]; print lens
[3 4]

sw 中含有 true,对应于开关,isw 将它们转换为索引。然后,lens 中的项目成对相减。

请注意,如果序列以 1 开头,则会计算 0 序列的长度:可以在索引中修正此问题以计算 lens。还未测试过长度为 1 的序列等边角情况。


完整函数返回所有 true 子数组的起始位置和长度。

import numpy as np

def count_adjacent_true(arr):
    assert len(arr.shape) == 1
    assert arr.dtype == np.bool
    if arr.size == 0:
        return np.empty(0, dtype=int), np.empty(0, dtype=int)
    sw = np.insert(arr[1:] ^ arr[:-1], [0, arr.shape[0]-1], values=True)
    swi = np.arange(sw.shape[0])[sw]
    offset = 0 if arr[0] else 1
    lengths = swi[offset+1::2] - swi[offset:-1:2]
    return swi[offset:-1:2], lengths

测试了不同的布尔类型一维数组(空数组;单个/多个元素;偶数/奇数长度;以True/False开头;只有True/False元素)。


6

如果有人感兴趣(并且因为您提到了MATLAB),这里是在MATLAB中解决它的一种方法:

threshold = 7;
d = 10*rand(1,100000);  % Sample data
b = diff([false (d < threshold) false]);
durations = find(b == -1)-find(b == 1);

我对Python不是很熟悉,但这可能有助于为您提供一些想法。=)

1
diff() 在 numpy 中也存在,因此这基本上就是你想要的,只需将 find(foo) 替换为 where(foo)[0] 即可。 - dwf

5
也许有些晚了,但基于Numba的方法将是迄今为止最快的。
import numpy as np
import numba as nb


@nb.njit
def count_steps_nb(arr):
    result = 1
    last_x = arr[0]
    for x in arr[1:]:
        if last_x != x:
            result += 1
            last_x = x
    return result


@nb.njit
def rle_nb(arr):
    n = count_steps_nb(arr)
    values = np.empty(n, dtype=arr.dtype)
    lengths = np.empty(n, dtype=np.int_)
    last_x = arr[0]
    length = 1
    i = 0
    for x in arr[1:]:
        if last_x != x:
            values[i] = last_x
            lengths[i] = length
            i += 1
            length = 1
            last_x = x
        else:
            length += 1
    values[i] = last_x
    lengths[i] = length
    return values, lengths

基于Numpy的方法(受@ThomasBrowne答案启发,但更快,因为昂贵的numpy.concatenate()的使用被降至最低)是亚军(这里介绍了两种类似的方法,一种使用不等式来找到步骤的位置,另一种使用差异)。
import numpy as np


def rle_np_neq(arr):
    n = len(arr)
    if n == 0:
        values = np.empty(0, dtype=arr.dtype)
        lengths = np.empty(0, dtype=np.int_)
    else:
        positions = np.concatenate(
            [[-1], np.nonzero(arr[1:] != arr[:-1])[0], [n - 1]])
        lengths = positions[1:] - positions[:-1]
        values = arr[positions[1:]]
    return values, lengths

import numpy as np


def rle_np_diff(arr):
    n = len(arr)
    if n == 0:
        values = np.empty(0, dtype=arr.dtype)
        lengths = np.empty(0, dtype=np.int_)
    else:
        positions = np.concatenate(
            [[-1], np.nonzero(arr[1:] - arr[:-1])[0], [n - 1]])
        lengths = positions[1:] - positions[:-1]
        values = arr[positions[1:]]
        return values, lengths

这两种方法都优于朴素和简单的单循环方法:
import numpy as np


def rle_loop(arr):
    values = []
    lengths = []
    last_x = arr[0]
    length = 1
    for x in arr[1:]:
        if last_x != x:
            values.append(last_x)
            lengths.append(length)
            length = 1
            last_x = x
        else:
            length += 1
    values.append(last_x)
    lengths.append(length)
    return np.array(values), np.array(lengths)

相反,使用 itertools.groupby() 不会比简单的循环更快(除非在非常特殊的情况下,例如 @AlexMartelli answer 中所述,或者有人在组对象上实现了 __len__),因为通常没有简单的方法来提取组大小信息,除了通过对组本身进行循环,这不是很快的方法:
import itertools
import numpy as np


def rle_groupby(arr):
    values = []
    lengths = []
    for x, group in itertools.groupby(arr):
        values.append(x)
        lengths.append(sum(1 for _ in group))
    return np.array(values), np.array(lengths)

一些基准测试结果针对不同大小的随机整数数组进行了报告:

bm_full bm_zoom

(完整分析在此处)。


0
durations = []
counter   = 0

for bool in b:
    if bool:
        counter += 1
    elif counter > 0:
        durations.append(counter)
        counter = 0

if counter > 0:
    durations.append(counter)

1
当然,这更加简洁,但同样低效;我想要做的是通过使用一些巧妙的numpy调用将循环移动到C层... - Gyom
请检查我的编辑答案,我现在提供了一个“聪明的组合”(总是尽力不要太聪明了;-),但请测量那个和基于itertools.groupby的解决方案的速度,并告诉我们哪一个更快(以及快多少),在你看来真实的例子中! - Alex Martelli

0
import numpy as np
a = np.array([3, 3, 3, 1, 3, 3, 3, 3, 1, 1, 3, 3, 3, 3, 3])
is_not_in_run = (a != 3)
is_not_in_run = np.concatenate(([True], is_not_in_run, [True]))
gap_indices = np.where(is_not_in_run)[0]
run_lengths = np.diff(gap_indices) - 1
run_lengths = np.delete(run_lengths, np.where(run_lengths == 0)[0])
print(run_lengths)

网页内容由stack overflow 提供, 点击上面的
可以查看英文原文,
原文链接