实时时间序列数据中的峰值信号检测

401

更新:到目前为止,表现最佳的算法是这个。
这个问题探讨了在实时时间序列数据中检测突然峰值的强大算法。
考虑以下示例数据:

Plot of data

这些数据的示例是以Matlab格式呈现的(但这个问题不是关于语言,而是关于算法)。
p = [1 1 1.1 1 0.9 1 1 1.1 1 0.9 1 1.1 1 1 0.9 1 1 1.1 1 1 1 1 1.1 0.9 1 1.1 1 1 0.9, ...
     1 1.1 1 1 1.1 1 0.8 0.9 1 1.2 0.9 1 1 1.1 1.2 1 1.5 1 3 2 5 3 2 1 1 1 0.9 1 1, ... 
     3 2.6 4 3 3.2 2 1 1 0.8 4 4 2 2.5 1 1 1];

你可以清楚地看到有三个大峰和一些小峰。这个数据集是关于时间序列数据集类的一个具体例子。这类数据集具有两个一般特征:
1. 存在基本噪声,具有一般均值。 2. 存在显著偏离噪声的大“峰值”或“较高数据点”。
让我们还假设以下情况:
- 峰值的宽度无法事先确定。 - 峰值的高度与其他数值显著偏离。 - 算法实时更新(即每个新数据点都进行更新)。
对于这种情况,需要构建一个触发信号的边界值。然而,边界值不能是静态的,必须使用算法实时确定。
我的问题是:有没有一个好的算法可以实时计算这样的阈值?对于这种情况,是否有特定的算法?最著名的算法有哪些?
强大的算法或有用的见解都非常受欢迎。(可以用任何语言回答:这是关于算法的)

1
https://towardsdatascience.com/real-time-time-series-anomaly-detection-981cf1e1ca13 - Marco Cerliani
40个回答

621
鲁棒的峰值检测算法(使用z-分数)
我提出了一种在这类数据集上非常有效的算法。它基于离散度的原理:如果一个新的数据点与移动平均值相比偏离了给定的x个标准差,算法就会发出信号。该算法非常鲁棒,因为它构建了一个独立的移动平均值和偏差,以确保之前的信号不会影响未来信号的阈值。因此,该算法的灵敏度对之前的信号是鲁棒的。
该算法需要3个输入:
1. 滞后。计算历史数据均值和标准差的移动窗口的滞后。较长的窗口会考虑更多历史数据。较短的窗口更具适应性,算法会更快地适应新信息。例如,滞后为5表示使用最近的5个观测值平滑数据。
2. 阈值。算法发出信号的“z-分数”。简单来说,如果新数据点与移动均值之间的距离大于阈值乘以数据的移动标准差,算法会提供一个信号。例如,阈值为3.5表示如果数据点与移动均值相差3.5个标准差,算法会发出信号。
3. 影响。新信号对移动均值和移动标准差计算的影响(介于0和1之间)。例如,影响参数为0.5表示新信号只有正常数据点影响的一半。同样地,影响为0表示完全忽略信号以重新计算新的阈值。因此,影响为0是最稳健的选择(但假设数据具有平稳性);将影响选项设为1是最不稳健的。对于非平稳数据,影响选项应该介于0和1之间。
它的工作原理如下:
伪代码:
# Let y be a vector of timeseries data of at least length lag+2
# Let mean() be a function that calculates the mean
# Let std() be a function that calculates the standard deviaton
# Let absolute() be the absolute value function

# Settings (these are examples: choose what is best for your data!)
set lag to 5;          # average and std. are based on past 5 observations
set threshold to 3.5;  # signal when data point is 3.5 std. away from average
set influence to 0.5;  # between 0 (no influence) and 1 (full influence)

# Initialize variables
set signals to vector 0,...,0 of length of y;   # Initialize signal results
set filteredY to y(1),...,y(lag)                # Initialize filtered series
set avgFilter to null;                          # Initialize average filter
set stdFilter to null;                          # Initialize std. filter
set avgFilter(lag) to mean(y(1),...,y(lag));    # Initialize first value average
set stdFilter(lag) to std(y(1),...,y(lag));     # Initialize first value std.

for i=lag+1,...,t do
  if absolute(y(i) - avgFilter(i-1)) > threshold*stdFilter(i-1) then
    if y(i) > avgFilter(i-1) then
      set signals(i) to +1;                     # Positive signal
    else
      set signals(i) to -1;                     # Negative signal
    end
    set filteredY(i) to influence*y(i) + (1-influence)*filteredY(i-1);
  else
    set signals(i) to 0;                        # No signal
    set filteredY(i) to y(i);
  end
  set avgFilter(i) to mean(filteredY(i-lag+1),...,filteredY(i));
  set stdFilter(i) to std(filteredY(i-lag+1),...,filteredY(i));
end

以下是选择数据的好参数的经验法则。
演示

Demonstration of robust thresholding algorithm

这个演示的Matlab代码可以在这里找到。要使用演示,只需运行它并通过点击上面的图表来创建一个时间序列。在绘制了lag个观测值之后,算法开始工作。

结果

对于原始问题,在使用以下设置时,该算法将给出以下输出:滞后 = 30,阈值 = 5,影响力 = 0

Thresholding algorithm example


不同编程语言的实现方式:

配置算法的经验法则

滞后。滞后参数决定了数据平滑的程度以及算法对数据长期平均值变化的适应性。如果你的数据是稳定的,你应该包含更多的滞后期(这样可以提高算法的鲁棒性)。如果你的数据包含时间变化的趋势,你应该考虑算法对这些趋势的快速适应程度。例如,如果你将滞后设置为10,那么在任何系统性变化的长期平均值中,算法的阈值将在10个“周期”之后进行调整。因此,根据你的数据的趋势行为和你希望算法的适应性选择滞后参数。

影响力。该参数确定信号对算法检测阈值的影响程度。如果设置为0,信号对阈值没有影响,因此未来的信号是基于一个由平均值和标准差计算出的阈值来检测的,该阈值不受过去信号的影响。如果设置为0.5,信号对正常数据点的影响力为一半。另一种思考方式是,如果将影响力设置为0,就暗示了稳定性(即无论有多少信号,您始终期望时间序列在长期内返回到相同的平均值)。如果情况不是这样的,您应该将影响力参数设置在0和1之间的某个位置,具体取决于信号对数据的时变趋势能够系统地产生多大影响。例如,如果信号导致时间序列的长期平均值出现结构性突变,则应将影响力参数设置得较高(接近1),以便阈值能够快速响应结构性突变。 阈值。阈值参数是算法将新数据点分类为信号的移动平均值上方的标准差数。例如,如果一个新数据点的标准差是移动平均值的4.0倍,而阈值参数设定为3.5,算法将把该数据点识别为信号。该参数应根据您期望的信号数量进行设置。例如,如果您的数据服从正态分布,阈值(或z分数)为3.5对应的信号概率为0.00047(来自此表),这意味着您预计每2128个数据点会出现一个信号(1/0.00047)。因此,阈值直接影响算法的敏感性,从而决定算法发出信号的频率。请检查您自己的数据,并选择一个合理的阈值,使算法在您希望发出信号时发出(可能需要一些试错来找到适合您目的的好阈值)。

警告:上述代码每次运行时总是循环遍历所有数据点。在实现此代码时,请确保将信号的计算拆分为一个单独的函数(不包含循环)。然后,当有新的数据点到达时,只需更新filteredYavgFilterstdFilter一次。不要每次有新的数据点时都重新计算所有数据的信号(就像上面的示例中那样),这样在实时应用中会非常低效和慢。

修改算法的其他方法(以提高性能)包括:

  1. 使用中位数而不是平均值
  2. 使用鲁棒尺度测量,例如中位绝对偏差(MAD),而不是标准差
  3. 使用信号边界,以便信号不会频繁切换
  4. 改变影响参数的工作方式
  5. 对待上升下降信号有所不同(非对称处理)
  6. 为均值和标准差创建一个单独的影响参数(如此Swift翻译中所示

(已知的)对这个StackOverflow答案的学术引用:

使用此答案中的算法的其他作品

这个答案中算法的其他应用

其他峰值检测算法的链接


如何引用此算法:

Brakel, J.P.G. van (2014). "使用z分数的鲁棒峰值检测算法". Stack Overflow. 可在以下网址找到: https://dev59.com/C2Eh5IYBdhLWcg3wTB1c#22640362 (版本:2020-11-08)。

Bibtex @misc{brakel2014, author = {Brakel, J.P.G van}, title = {使用z分数的鲁棒峰值检测算法}, url = {https://dev59.com/C2Eh5IYBdhLWcg3wTB1c#22640362}, language = {en}, year = {2014}, urldate = {2022-04-12}, journal = {Stack Overflow}, howpublished = {https://dev59.com/C2Eh5IYBdhLWcg3wTB1c#22640362}}


如果你在某个地方使用了这个功能,请通过上面的参考链接给我署名。如果你对算法有任何问题,请在下面的评论中发表或通过LinkedIn与我联系。

2
有很多方法可以改进这个算法,所以要有创意(不同的上/下处理方式;中位数代替平均值;鲁棒标准差;将代码编写为内存高效的函数;阈值边界,使信号不会过于频繁地切换等)。 - Jean-Paul
2
@datapug 这个算法是专门设计用于实时数据处理的,因此在计算信号时未来值是未知的。你是否拥有整个时间序列的先前信息?如果是这样,你确实可以反转数据。 - Jean-Paul
2
@Jean-Paul,是的,现在我明白了...我的问题是我试图模拟一个峰值,导致了一些我无法解释的问题...请看这里:https://imgur.com/a/GFz59jl 正如你所看到的-在信号恢复到原始值之后-标准偏差保持为0。 - Yitzchak
2
@Yitzchak 我明白了。的确,该算法假定信号的持续时间小于峰值的持续时间。在您的情况下,标准差确实会趋近于零(因为每个filteredY(i) = filteredY(i-1))。如果您想使算法适用于您的情况(influence = 0),一个快速而肮脏的解决方案是将行set filteredY(i) to influence*y(i) + (1-influence)*filteredY(i-1);更改为set filteredY(i) to filteredY(i-lag)。这样,阈值将简单地从峰值发生之前的值中循环使用。请参见此处演示 - Jean-Paul
1
如果整个数据集提前已知,您可以使用历史和未来值来计算移动平均滤波器。因此,在这种情况下,您可以将移动平均窗口置于您正在尝试查找峰值的观察周围。如果您提前知道所有数据,还有很多其他算法可供使用。例如,请参见https://docs.scipy.org/doc/scipy/reference/signal.html#peak-finding - Jean-Paul
显示剩余37条评论

64

下面是平滑z-score算法的Python/numpy实现(参见上面的答案)。你可以在这里找到gist

#!/usr/bin/env python
# Implementation of algorithm from https://dev59.com/C2Eh5IYBdhLWcg3wTB1c#22640362
import numpy as np
import pylab

def thresholding_algo(y, lag, threshold, influence):
    signals = np.zeros(len(y))
    filteredY = np.array(y)
    avgFilter = [0]*len(y)
    stdFilter = [0]*len(y)
    avgFilter[lag - 1] = np.mean(y[0:lag])
    stdFilter[lag - 1] = np.std(y[0:lag])
    for i in range(lag, len(y)):
        if abs(y[i] - avgFilter[i-1]) > threshold * stdFilter [i-1]:
            if y[i] > avgFilter[i-1]:
                signals[i] = 1
            else:
                signals[i] = -1

            filteredY[i] = influence * y[i] + (1 - influence) * filteredY[i-1]
            avgFilter[i] = np.mean(filteredY[(i-lag+1):i+1])
            stdFilter[i] = np.std(filteredY[(i-lag+1):i+1])
        else:
            signals[i] = 0
            filteredY[i] = y[i]
            avgFilter[i] = np.mean(filteredY[(i-lag+1):i+1])
            stdFilter[i] = np.std(filteredY[(i-lag+1):i+1])

    return dict(signals = np.asarray(signals),
                avgFilter = np.asarray(avgFilter),
                stdFilter = np.asarray(stdFilter))

以下是对同一数据集进行的测试,其结果与原回答中的 R/Matlab 绘图相同。

# Data
y = np.array([1,1,1.1,1,0.9,1,1,1.1,1,0.9,1,1.1,1,1,0.9,1,1,1.1,1,1,1,1,1.1,0.9,1,1.1,1,1,0.9,
       1,1.1,1,1,1.1,1,0.8,0.9,1,1.2,0.9,1,1,1.1,1.2,1,1.5,1,3,2,5,3,2,1,1,1,0.9,1,1,3,
       2.6,4,3,3.2,2,1,1,0.8,4,4,2,2.5,1,1,1])

# Settings: lag = 30, threshold = 5, influence = 0
lag = 30
threshold = 5
influence = 0

# Run algo with settings from above
result = thresholding_algo(y, lag=lag, threshold=threshold, influence=influence)

# Plot result
pylab.subplot(211)
pylab.plot(np.arange(1, len(y)+1), y)

pylab.plot(np.arange(1, len(y)+1),
           result["avgFilter"], color="cyan", lw=2)

pylab.plot(np.arange(1, len(y)+1),
           result["avgFilter"] + threshold * result["stdFilter"], color="green", lw=2)

pylab.plot(np.arange(1, len(y)+1),
           result["avgFilter"] - threshold * result["stdFilter"], color="green", lw=2)

pylab.subplot(212)
pylab.step(np.arange(1, len(y)+1), result["signals"], color="red", lw=2)
pylab.ylim(-1.5, 1.5)
pylab.show()

这里的'y'实际上是信号,而'signals'是数据点集合,我的理解是正确的吗? - TheTank
2
@TheTank y 是您传入的数据数组,signals 是一个 +1-1 的输出数组,用于指示每个数据点 y[i] 是否是根据您使用的设置而言的“显著峰值”。 - Jean-Paul

31
一种方法是基于以下观察来检测峰值:
时间t是峰值,如果(y(t) > y(t-1)) && (y(t) > y(t+1))
它通过等待上涨趋势结束来避免误报。从某种意义上说,它不完全是“实时的”,因为它将会错过一个dt的高峰。灵敏度可以通过需要比较的边界来控制。在嘈杂的检测和检测延迟之间存在权衡。您可以通过添加更多参数来丰富模型:
当(y(t) - y(t-dt) > m) && (y(t) - y(t+dt) > m)时,是高峰
其中dt和m是用于控制灵敏度和时间延迟的参数
这就是使用上述算法得到的结果:
在Python中再现绘图的代码如下:
import numpy as np
import matplotlib.pyplot as plt
input = np.array([ 1. ,  1. ,  1. ,  1. ,  1. ,  1. ,  1. ,  1.1,  1. ,  0.8,  0.9,
    1. ,  1.2,  0.9,  1. ,  1. ,  1.1,  1.2,  1. ,  1.5,  1. ,  3. ,
    2. ,  5. ,  3. ,  2. ,  1. ,  1. ,  1. ,  0.9,  1. ,  1. ,  3. ,
    2.6,  4. ,  3. ,  3.2,  2. ,  1. ,  1. ,  1. ,  1. ,  1. ])
signal = (input > np.roll(input,1)) & (input > np.roll(input,-1))
plt.plot(input)
plt.plot(signal.nonzero()[0], input[signal], 'ro')
plt.show()

通过设置 m = 0.5,您可以获得一个更干净的信号,并且只有一个误报: enter image description here

我该如何改变灵敏度? - Jean-Paul
我可以想到两种方法:1:将m设置为较大的值,以便仅检测到较大的峰值。2:不要计算y(t) - y(t-dt)(和y(t) - y(t+dt)),而是从t-dt到t(和t到t+dt)进行积分。 - aha
3
你根据什么标准拒绝了其他七个山峰? - hotpaw2
6
平顶存在问题,因为它基本上是一维边缘检测(例如使用[1 0 -1]卷积信号)。 - ben

24
在信号处理中,峰值检测通常通过小波变换实现。您基本上需要对时间序列数据进行离散小波变换。返回的细节系数中的零交叉点将对应于时间序列信号中的峰值。您可以在不同的细节系数级别上检测到不同的峰值幅度,这提供了多级分辨率。

1
你的答案让我找到了这篇文章(http://en.wikipedia.org/wiki/Change_detection)和这个回答(https://dev59.com/YGcs5IYBdhLWcg3wcTgz#13660660),将帮助我构建一个好的算法来实现我的项目。谢谢! - Jean-Paul

21

这是一个可以处理实时数据流的Python版本(在每个新数据点到达时不重新计算所有数据点)。您可能想调整类函数返回的内容 - 对于我的目的,我只需要信号。

import numpy as np


class real_time_peak_detection():
    def __init__(self, array, lag, threshold, influence):
        self.y = list(array)
        self.length = len(self.y)
        self.lag = lag
        self.threshold = threshold
        self.influence = influence
        self.signals = [0] * len(self.y)
        self.filteredY = np.array(self.y).tolist()
        self.avgFilter = [0] * len(self.y)
        self.stdFilter = [0] * len(self.y)
        self.avgFilter[self.lag - 1] = np.mean(self.y[0:self.lag]).tolist()
        self.stdFilter[self.lag - 1] = np.std(self.y[0:self.lag]).tolist()

    def thresholding_algo(self, new_value):
        self.y.append(new_value)
        i = len(self.y) - 1
        self.length = len(self.y)
        if i < self.lag:
            return 0
        elif i == self.lag:
            self.signals = [0] * len(self.y)
            self.filteredY = np.array(self.y).tolist()
            self.avgFilter = [0] * len(self.y)
            self.stdFilter = [0] * len(self.y)
            self.avgFilter[self.lag] = np.mean(self.y[0:self.lag]).tolist()
            self.stdFilter[self.lag] = np.std(self.y[0:self.lag]).tolist()
            return 0

        self.signals += [0]
        self.filteredY += [0]
        self.avgFilter += [0]
        self.stdFilter += [0]

        if abs(self.y[i] - self.avgFilter[i - 1]) > (self.threshold * self.stdFilter[i - 1]):

            if self.y[i] > self.avgFilter[i - 1]:
                self.signals[i] = 1
            else:
                self.signals[i] = -1

            self.filteredY[i] = self.influence * self.y[i] + \
                (1 - self.influence) * self.filteredY[i - 1]
            self.avgFilter[i] = np.mean(self.filteredY[(i - self.lag):i])
            self.stdFilter[i] = np.std(self.filteredY[(i - self.lag):i])
        else:
            self.signals[i] = 0
            self.filteredY[i] = self.y[i]
            self.avgFilter[i] = np.mean(self.filteredY[(i - self.lag):i])
            self.stdFilter[i] = np.std(self.filteredY[(i - self.lag):i])

        return self.signals[i]

我做错了什么?当我使用问题中提供的数据和您的代码时,我只得到了零,即out_signals = real_time_peak_detection(y,30,5,0).signals。 - undefined

15
在计算拓扑学中,持久同调的思想导致了一种高效的解决方案——快速排序数字。它不仅可以检测峰值,而且可以以自然的方式量化峰值的“重要性”,从而使您能够选择对您有意义的峰值。
算法摘要。 在一维情况下(时间序列、实值信号),该算法可以通过以下图示轻松描述:

Most persistent peaks

将函数图(或其子级集)视为一个地形,考虑从无穷大水平(或此图片中的1.8)开始下降的水位。当水位下降时,局部极大值岛屿会出现。在局部极小值处,这些岛屿会合并在一起。这个想法中的一个细节是,后来出现的岛屿会合并到更老的岛屿中。岛屿的“持久性”是它的出生时间减去它的死亡时间。蓝色条的长度描绘了持久性,这就是上述峰值的“重要性”。
效率。 在函数值排序之后,找到一个线性时间实现并不太难-实际上只需要一个简单的循环。因此,这个实现在实践中应该很快,并且也很容易实现。
参考文献。 整个故事的写作以及对持久同调(计算代数拓扑学领域)动机的引用可以在这里找到:https://www.sthu.org/blog/13-perstopology-peakdetection/index.html

1
所谓实时数据,是指通过所谓的在线算法逐个接收数据点。峰值的重要性可能由未来的数值决定。如果能在不牺牲时间复杂度太多的情况下修改过去的结果,将算法扩展为在线算法将是一件好事。 - S. Huber
蓝色条的长度对我来说没有意义。看起来它们总是指前面的局部最小值,而不是后面的最小值。在我看来,它们应该同时指两者,这意味着#1和3会更短。 - Tobias Knauss
此外,我们实际上可以将其推广到二维,并且这里的“左”和“右”已经失去了意义。请参阅 https://www.sthu.org/code/codesnippets/imagepers.html 以及链接的两个stackoverflow问题。 - S. Huber
1
我在C++中实现了相同的算法,比上面链接的Python实现快约45倍。 C++实现可在此处找到(https://gitlab.com/balping/find-peaks)。享受吧。 - balping
@S.Huber:即使在二维中,您也可以适当地创建蓝色条形图:条形图的长度应为峰值减去最近的局部最小值,其中“最近”与XY平面上的线性距离有关。 - Tobias Knauss
显示剩余3条评论

14

我们尝试对数据集使用平滑的z-score算法,这会导致过于敏感或不敏感(取决于参数如何调整),并且很少有中间地带。在我们网站的流量信号中,我们观察到一个低频基线,代表每日周期,即使使用最好的参数(如下所示),它仍然会特别缓慢,特别是第四天,因为大多数数据点被识别为异常。

在原始z-score算法的基础上,我们想出了一种通过反向过滤来解决这个问题的方法。修改后算法的详细信息以及其在电视广告流量归因方面的应用发布在我们的团队博客上。

enter image description here


2
很高兴看到算法成为你更高级版本的起点。你的数据具有非常特殊的模式,因此使用其他技术首先去除模式然后在残差上应用算法会更有意义。或者,您可能希望使用居中而不是滞后窗口来计算平均值/标准差。另一个评论:您的解决方案从右向左移动以识别峰值,但这在实时应用程序中是不可能的(这就是为什么原始算法如此简单,因为未来信息是无法访问的)。 - Jean-Paul

13

附录1:原始答案的MatlabR翻译

Matlab代码

function [signals,avgFilter,stdFilter] = ThresholdingAlgo(y,lag,threshold,influence)
% Initialise signal results
signals = zeros(length(y),1);
% Initialise filtered series
filteredY = y(1:lag+1);
% Initialise filters
avgFilter(lag+1,1) = mean(y(1:lag+1));
stdFilter(lag+1,1) = std(y(1:lag+1));
% Loop over all datapoints y(lag+2),...,y(t)
for i=lag+2:length(y)
    % If new value is a specified number of deviations away
    if abs(y(i)-avgFilter(i-1)) > threshold*stdFilter(i-1)
        if y(i) > avgFilter(i-1)
            % Positive signal
            signals(i) = 1;
        else
            % Negative signal
            signals(i) = -1;
        end
        % Make influence lower
        filteredY(i) = influence*y(i)+(1-influence)*filteredY(i-1);
    else
        % No signal
        signals(i) = 0;
        filteredY(i) = y(i);
    end
    % Adjust the filters
    avgFilter(i) = mean(filteredY(i-lag:i));
    stdFilter(i) = std(filteredY(i-lag:i));
end
% Done, now return results
end

例子:

% Data
y = [1 1 1.1 1 0.9 1 1 1.1 1 0.9 1 1.1 1 1 0.9 1 1 1.1 1 1,...
    1 1 1.1 0.9 1 1.1 1 1 0.9 1 1.1 1 1 1.1 1 0.8 0.9 1 1.2 0.9 1,...
    1 1.1 1.2 1 1.5 1 3 2 5 3 2 1 1 1 0.9 1,...
    1 3 2.6 4 3 3.2 2 1 1 0.8 4 4 2 2.5 1 1 1];

% Settings
lag = 30;
threshold = 5;
influence = 0;

% Get results
[signals,avg,dev] = ThresholdingAlgo(y,lag,threshold,influence);

figure; subplot(2,1,1); hold on;
x = 1:length(y); ix = lag+1:length(y);
area(x(ix),avg(ix)+threshold*dev(ix),'FaceColor',[0.9 0.9 0.9],'EdgeColor','none');
area(x(ix),avg(ix)-threshold*dev(ix),'FaceColor',[1 1 1],'EdgeColor','none');
plot(x(ix),avg(ix),'LineWidth',1,'Color','cyan','LineWidth',1.5);
plot(x(ix),avg(ix)+threshold*dev(ix),'LineWidth',1,'Color','green','LineWidth',1.5);
plot(x(ix),avg(ix)-threshold*dev(ix),'LineWidth',1,'Color','green','LineWidth',1.5);
plot(1:length(y),y,'b');
subplot(2,1,2);
stairs(signals,'r','LineWidth',1.5); ylim([-1.5 1.5]);

R 代码
ThresholdingAlgo <- function(y,lag,threshold,influence) {
  signals <- rep(0,length(y))
  filteredY <- y[1:lag]
  avgFilter <- NULL
  stdFilter <- NULL
  avgFilter[lag] <- mean(y[1:lag], na.rm=TRUE)
  stdFilter[lag] <- sd(y[1:lag], na.rm=TRUE)
  for (i in (lag+1):length(y)){
    if (abs(y[i]-avgFilter[i-1]) > threshold*stdFilter[i-1]) {
      if (y[i] > avgFilter[i-1]) {
        signals[i] <- 1;
      } else {
        signals[i] <- -1;
      }
      filteredY[i] <- influence*y[i]+(1-influence)*filteredY[i-1]
    } else {
      signals[i] <- 0
      filteredY[i] <- y[i]
    }
    avgFilter[i] <- mean(filteredY[(i-lag):i], na.rm=TRUE)
    stdFilter[i] <- sd(filteredY[(i-lag):i], na.rm=TRUE)
  }
  return(list("signals"=signals,"avgFilter"=avgFilter,"stdFilter"=stdFilter))
}

例子:

# Data
y <- c(1,1,1.1,1,0.9,1,1,1.1,1,0.9,1,1.1,1,1,0.9,1,1,1.1,1,1,1,1,1.1,0.9,1,1.1,1,1,0.9,
       1,1.1,1,1,1.1,1,0.8,0.9,1,1.2,0.9,1,1,1.1,1.2,1,1.5,1,3,2,5,3,2,1,1,1,0.9,1,1,3,
       2.6,4,3,3.2,2,1,1,0.8,4,4,2,2.5,1,1,1)

lag       <- 30
threshold <- 5
influence <- 0

# Run algo with lag = 30, threshold = 5, influence = 0
result <- ThresholdingAlgo(y,lag,threshold,influence)

# Plot result
par(mfrow = c(2,1),oma = c(2,2,0,0) + 0.1,mar = c(0,0,2,1) + 0.2)
plot(1:length(y),y,type="l",ylab="",xlab="") 
lines(1:length(y),result$avgFilter,type="l",col="cyan",lwd=2)
lines(1:length(y),result$avgFilter+threshold*result$stdFilter,type="l",col="green",lwd=2)
lines(1:length(y),result$avgFilter-threshold*result$stdFilter,type="l",col="green",lwd=2)
plot(result$signals,type="S",col="red",ylab="",xlab="",ylim=c(-1.5,1.5),lwd=2)

这段代码(两种语言)将为原始问题的数据产生以下结果:

Thresholding example from Matlab code


附录2:原始答案的附加部分:Matlab演示代码

(点击以生成数据)

Matlab demo

function [] = RobustThresholdingDemo()

%% SPECIFICATIONS
lag         = 5;       % lag for the smoothing
threshold   = 3.5;     % number of st.dev. away from the mean to signal
influence   = 0.3;     % when signal: how much influence for new data? (between 0 and 1)
                       % 1 is normal influence, 0.5 is half      
%% START DEMO
DemoScreen(30,lag,threshold,influence);

end

function [signals,avgFilter,stdFilter] = ThresholdingAlgo(y,lag,threshold,influence)
signals = zeros(length(y),1);
filteredY = y(1:lag+1);
avgFilter(lag+1,1) = mean(y(1:lag+1));
stdFilter(lag+1,1) = std(y(1:lag+1));
for i=lag+2:length(y)
    if abs(y(i)-avgFilter(i-1)) > threshold*stdFilter(i-1)
        if y(i) > avgFilter(i-1)
            signals(i) = 1;
        else
            signals(i) = -1;
        end
        filteredY(i) = influence*y(i)+(1-influence)*filteredY(i-1);
    else
        signals(i) = 0;
        filteredY(i) = y(i);
    end
    avgFilter(i) = mean(filteredY(i-lag:i));
    stdFilter(i) = std(filteredY(i-lag:i));
end
end

% Demo screen function
function [] = DemoScreen(n,lag,threshold,influence)
figure('Position',[200 100,1000,500]);
subplot(2,1,1);
title(sprintf(['Draw data points (%.0f max)      [settings: lag = %.0f, '...
    'threshold = %.2f, influence = %.2f]'],n,lag,threshold,influence));
ylim([0 5]); xlim([0 50]);
H = gca; subplot(2,1,1);
set(H, 'YLimMode', 'manual'); set(H, 'XLimMode', 'manual');
set(H, 'YLim', get(H,'YLim')); set(H, 'XLim', get(H,'XLim'));
xg = []; yg = [];
for i=1:n
    try
        [xi,yi] = ginput(1);
    catch
        return;
    end
    xg = [xg xi]; yg = [yg yi];
    if i == 1
        subplot(2,1,1); hold on;
        plot(H, xg(i),yg(i),'r.'); 
        text(xg(i),yg(i),num2str(i),'FontSize',7);
    end
    if length(xg) > lag
        [signals,avg,dev] = ...
            ThresholdingAlgo(yg,lag,threshold,influence);
        area(xg(lag+1:end),avg(lag+1:end)+threshold*dev(lag+1:end),...
            'FaceColor',[0.9 0.9 0.9],'EdgeColor','none');
        area(xg(lag+1:end),avg(lag+1:end)-threshold*dev(lag+1:end),...
            'FaceColor',[1 1 1],'EdgeColor','none');
        plot(xg(lag+1:end),avg(lag+1:end),'LineWidth',1,'Color','cyan');
        plot(xg(lag+1:end),avg(lag+1:end)+threshold*dev(lag+1:end),...
            'LineWidth',1,'Color','green');
        plot(xg(lag+1:end),avg(lag+1:end)-threshold*dev(lag+1:end),...
            'LineWidth',1,'Color','green');
        subplot(2,1,2); hold on; title('Signal output');
        stairs(xg(lag+1:end),signals(lag+1:end),'LineWidth',2,'Color','blue');
        ylim([-2 2]); xlim([0 50]); hold off;
    end
    subplot(2,1,1); hold on;
    for j=2:i
        plot(xg([j-1:j]),yg([j-1:j]),'r'); plot(H,xg(j),yg(j),'r.');
        text(xg(j),yg(j),num2str(j),'FontSize',7);
    end
end
end


11

在此发现了Palshikar(2009)的另一种算法:

Palshikar,G.(2009)。时间序列峰值检测的简单算法。发表于第一届高级数据分析、商业分析和智能国际会议论文集中(第122卷)。

可以从这里下载该论文。

该算法如下:

algorithm peak1 // one peak detection algorithms that uses peak function S1 

input T = x1, x2, …, xN, N // input time-series of N points 
input k // window size around the peak 
input h // typically 1 <= h <= 3 
output O // set of peaks detected in T 

begin 
O = empty set // initially empty 

    for (i = 1; i < n; i++) do
        // compute peak function value for each of the N points in T 
        a[i] = S1(k,i,xi,T); 
    end for 

    Compute the mean m' and standard deviation s' of all positive values in array a; 

    for (i = 1; i < n; i++) do // remove local peaks which are “small” in global context 
        if (a[i] > 0 && (a[i] – m') >( h * s')) then O = O + {xi}; 
        end if 
    end for 

    Order peaks in O in terms of increasing index in T 

    // retain only one peak out of any set of peaks within distance k of each other 

    for every adjacent pair of peaks xi and xj in O do 
        if |j – i| <= k then remove the smaller value of {xi, xj} from O 
        end if 
    end for 
end

优点

  • 本文提供了5种不同的峰值检测算法。
  • 这些算法适用于原始时间序列数据(不需要平滑化)。

缺点

  • 很难事先确定kh的值。
  • 峰值不能是平坦的(例如测试数据中的第三个峰值)。

示例:

enter image description here


这篇论文非常有趣。在他的观点中,S4似乎是更好的函数。但更重要的是要澄清当k<i<N-k不成立时的情况。 如何定义函数S1(S2,..)?对于i=0,我只是忽略第一个操作数并且没有除以2,而对于每个其他的i,我都包括了两个操作数,但对于i<=k,左边的操作数比右边的少。 - daniels_pa

10

继@Jean-Paul提出的解决方案之后,我已经在C#中实现了他的算法。

public class ZScoreOutput
{
    public List<double> input;
    public List<int> signals;
    public List<double> avgFilter;
    public List<double> filtered_stddev;
}

public static class ZScore
{
    public static ZScoreOutput StartAlgo(List<double> input, int lag, double threshold, double influence)
    {
        // init variables!
        int[] signals = new int[input.Count];
        double[] filteredY = new List<double>(input).ToArray();
        double[] avgFilter = new double[input.Count];
        double[] stdFilter = new double[input.Count];

        var initialWindow = new List<double>(filteredY).Skip(0).Take(lag).ToList();

        avgFilter[lag - 1] = Mean(initialWindow);
        stdFilter[lag - 1] = StdDev(initialWindow);

        for (int i = lag; i < input.Count; i++)
        {
            if (Math.Abs(input[i] - avgFilter[i - 1]) > threshold * stdFilter[i - 1])
            {
                signals[i] = (input[i] > avgFilter[i - 1]) ? 1 : -1;
                filteredY[i] = influence * input[i] + (1 - influence) * filteredY[i - 1];
            }
            else
            {
                signals[i] = 0;
                filteredY[i] = input[i];
            }

            // Update rolling average and deviation
            var slidingWindow = new List<double>(filteredY).Skip(i - lag).Take(lag+1).ToList();

            var tmpMean = Mean(slidingWindow);
            var tmpStdDev = StdDev(slidingWindow);

            avgFilter[i] = Mean(slidingWindow);
            stdFilter[i] = StdDev(slidingWindow);
        }

        // Copy to convenience class 
        var result = new ZScoreOutput();
        result.input = input;
        result.avgFilter       = new List<double>(avgFilter);
        result.signals         = new List<int>(signals);
        result.filtered_stddev = new List<double>(stdFilter);

        return result;
    }

    private static double Mean(List<double> list)
    {
        // Simple helper function! 
        return list.Average();
    }

    private static double StdDev(List<double> values)
    {
        double ret = 0;
        if (values.Count() > 0)
        {
            double avg = values.Average();
            double sum = values.Sum(d => Math.Pow(d - avg, 2));
            ret = Math.Sqrt((sum) / (values.Count() - 1));
        }
        return ret;
    }
}

用法示例:

var input = new List<double> {1.0, 1.0, 1.1, 1.0, 0.9, 1.0, 1.0, 1.1, 1.0, 0.9, 1.0,
    1.1, 1.0, 1.0, 0.9, 1.0, 1.0, 1.1, 1.0, 1.0, 1.0, 1.0, 1.1, 0.9, 1.0, 1.1, 1.0, 1.0, 0.9,
    1.0, 1.1, 1.0, 1.0, 1.1, 1.0, 0.8, 0.9, 1.0, 1.2, 0.9, 1.0, 1.0, 1.1, 1.2, 1.0, 1.5, 1.0,
    3.0, 2.0, 5.0, 3.0, 2.0, 1.0, 1.0, 1.0, 0.9, 1.0, 1.0, 3.0, 2.6, 4.0, 3.0, 3.2, 2.0, 1.0,
    1.0, 0.8, 4.0, 4.0, 2.0, 2.5, 1.0, 1.0, 1.0};

int lag = 30;
double threshold = 5.0;
double influence = 0.0;

var output = ZScore.StartAlgo(input, lag, threshold, influence);

2
嗨,我认为那段代码有错误,在StdDev方法中,你使用了values.Count()-1,应该是真的需要-1吗?我认为你想要的是项目数量,这正是从values.Count()获取的。 - Viktor
3
嗯...发现得不错。虽然我最初将算法转换为C#,但我从未使用过它。我可能会用nuget库MathNet中的函数来替换整个函数。"Install-Package MathNet.Numerics" 它具有PopulationStandardDeviation()和StandardDeviation()的预建函数;例如:var populationStdDev = new List<double>(1,2,3,4).PopulationStandardDeviation(); var sampleStdDev = new List<double>(1,2,3,4).StandardDeviation(); - Ocean Airdrop

网页内容由stack overflow 提供, 点击上面的
可以查看英文原文,
原文链接