如何迭代计算加权移动平均值,使得最后的数值占据最大的权重?

16
我想要实现一个迭代算法,可以计算加权平均值。具体的权重规则并不重要,但最新的值应该接近1,而最旧的值应该接近0。
这个算法应该是迭代式的,也就是说它不应该记住所有之前的值。它只需要知道一个最新的值和任何有关以往汇总信息的信息,例如平均值、总和、数量等。
是否有可能实现这个算法呢?
例如,以下算法可能是:
void iterate(double value) {
   sum *= 0.99;
   sum += value;
   count++;
   avg = sum / count;
}

它将给出指数递减权重,这可能不好。是否可能有步长递减权重或其他方式?

编辑1

权重法的要求如下:

1)权重随时间递减 2)我有一些平均值或特征持续时间,因此比这个时间更早的值的重要性要小得多 3)我应该能够设置这个持续时间

编辑2

我需要以下内容。假设v_i是值,其中v_1是第一个。还假设w_i是权重。但是w_0是最后一个。

因此,在第一个值出现后,我会得到第一个平均值。

 a_1 = v_1 * w_0

在第二个值v_2出现后,我应该有平均值

 a_2 = v_1 * w_1 + v_2 * w_0

我应该拥有下一个值

 a_3 = v_1 * w_2 + v_2 * w_1 + v_3 * w_0

请注意,重量配置文件会随着我的移动而移动,当我在值序列上移动时。

也就是说,每个值并不总是拥有自己的权重。我的目标是当向过去移动时降低这个权重。


2
你说特定的权重规则并不重要,但是你又说指数下降不好。那么这个函数有一些要求吗?它们是什么?而且,不能使用阶跃函数,因为这需要记忆过去的值才能实现。 - max
您提出的方法被称为EWMA(指数加权移动平均),并且被广泛使用。其他窗口函数也是可能的,但不像这么直接:您必须能够编写递归关系式。 - Alexandre C.
@Alexandre,这是我的问题:我可以使用哪些其他窗口,并如何使用递归关系。 - Suzan Cioc
https://en.wikipedia.org/wiki/GARCH 似乎描述了这类事物的一个家族(其中“记忆”的量是一个参数)。此外,显而易见但值得强调的是,窗口移动平均仅需要有限的内存(即窗口长度)。 - andrew cooke
这个问题不是可以通过一种无需记忆的方式来解决吗?比如说,简单地执行 Avg = 0.25CurrentValue + 0.75Avg 的迭代操作,以任何你想要的采样率进行。这将使当前值的权重最高,并模拟一个无限序列,而无需存储所有过去的值。如果你想在特定时间点开始计算平均值,可以使用 CurrentValue 预加载 Avg 的初始值。或者,我所描述的只是所谓的数据平滑,与你在这里讨论的内容无关吗?我知道这已经晚了。 - Evan B
9个回答

30

首先简单介绍一下背景。如果我们使用普通平均数,它会是这样的:

average(a) = 11
average(a,b) = (average(a)+b)/2
average(a,b,c) = (average(a,b)*2 + c)/3
average(a,b,c,d) = (average(a,b,c)*3 + d)/4

正如您在这里看到的,这是一种“在线”算法,我们只需要跟踪两个数据片段:1)平均值中的总数和2)平均值本身。然后,我们可以通过总数来将平均值进行“取消除”,加上新数字,再将其除以新的总数。

加权平均值有点不同。这取决于什么类型的加权平均值。例如,如果您定义了:

weightedAverage(a,wa, b,wb, c,wc, ..., z,wz) = a*wa + b*wb + c*wc + ... + w*wz
 or
weightedAverage(elements, weights) = elements·weights

如果你定义的加权平均值类似于概率中的期望值,那么除了添加新元素和权重之外,你不需要做任何事情!

但是如果不是这样,你需要做一些修改。

weightedAverage(elements,weights) = elements·weights / sum(weights)

如果你需要跟踪总权重,那么你需要保持追踪。 不是通过元素总数来除以它,而是通过总权重来进行未除运算,加上新元素*权重,然后除以新的总权重

或者你可以不用进行未除运算,如下所示:你可以在闭包或对象中跟踪临时点积和权重总和,并在生成时进行除法(这可以帮助避免由于复利舍入误差导致的数字不准确问题)。

在Python中,代码如下:

def makeAverager():
    dotProduct = 0
    totalWeight = 0

    def averager(newValue, weight):
        nonlocal dotProduct,totalWeight

        dotProduct += newValue*weight
        totalWeight += weight
        return dotProduct/totalWeight

    return averager

演示:

>>> averager = makeAverager()
>>> [averager(value,w) for value,w in [(100,0.2), (50,0.5), (100,0.1)]]
[100.0, 64.28571428571429, 68.75]
>>> averager(10,1.1)
34.73684210526316
>>> averager(10,1.1)
25.666666666666668
>>> averager(30,2.0)
27.4

在你的情况下,权重也是相同的。但我的任务是每次新值到达时重新计算平均值,并对旧值进行重新加权。 - Suzan Cioc

5

> 但我的任务是每次有新值到达时重新计算平均值,以旧值重新加权。-OP

您的任务几乎总是不可能完成,即使使用非常简单的加权方案也是如此。

您要求在O(1)内存的情况下,产生具有不断变化的加权方案的平均值。例如,{values·weights1(values + [newValue2])·weights2(values + [newValue2,newValue3])·weights3,...},因为一些近乎任意的变化的权重序列而被传递。由于可注入性,这是不可能的。一旦将数字合并在一起,就会失去大量信息。例如,即使您拥有权重向量,也无法恢复原始值向量,反之亦然。我只能想到两种情况可以逃脱:

  • 恒定的权重,如[2,2,2,…2]:这相当于在线平均算法,您不希望这样做,因为旧值没有被“重新加权”。
  • 以前的答案的相对权重不改变。例如,您可以使用[8,4,2,1]的权重,并添加具有任意权重的新元素,如...+[1],但您必须将所有先前的元素乘以相同的乘法因子,例如[16,8,4,2]+[1]。因此,在每个步骤中,您都会添加一个新的任意权重和过去的新的任意重新缩放,因此您有2个自由度(如果您需要保持点积规范化,则只有1个)。您将获得的权重向量如下:

 

[w0]
[w0*(s1), w1]
[w0*(s1*s2), w1*(s2), w2]
[w0*(s1*s2*s3), w1*(s2*s3), w2*(s3), w3]
...

因此,任何你可以让它看起来像那样的加权方案都可以使用(除非你需要通过权重总和来保持规范化,在这种情况下,你必须将新平均值除以新总和,你可以通过仅保留O(1)内存来计算)。只需将先前的平均值乘以新的s(它将隐式地分布到权重中的点积中),并添加新的+w*newValue

2
我认为您正在寻找类似于这样的东西:

我想你需要的是这样的东西:

void iterate(double value) {
    count++;

    weight = max(0, 1 - (count / 1000));

    avg = ( avg * total_weight * (count - 1)  + weight * value) / (total_weight * (count - 1) + weight)
    total_weight += weight;
}

1

在这里,我假设您希望权重总和为1。只要您可以生成一个相对权重而不会在未来发生变化,您就可以得到一种模拟此行为的解决方案。

也就是说,假设您将权重定义为序列{s_0,s_1,s_2,...,s_n,...},并将输入定义为序列{i_0,i_1,i_2,...,i_n}

考虑形式:sum(s_0*i_0 + s_1*i_1 + s_2*i_2 + ... + s_n*i_n) / sum(s_0 + s_1 + s_2 + ... + s_n)。请注意,使用几个聚合计数器可以轻松地逐步计算它:

int counter = 0;
double numerator = 0;
double denominator = 0;

void addValue(double val)
{
    double weight = calculateWeightFromCounter(counter);
    numerator += weight * val;
    denominator += weight;
}

double getAverage()
{
    if (denominator == 0.0) return 0.0;
    return numerator / denominator;
}

当然,在这种情况下,calculateWeightFromCounter() 不应该生成总和为一的权重 -- 这里的诀窍在于我们通过将权重除以它们的总和来进行平均,因此最终,权重似乎虚拟地总和为一。

真正的诀窍在于如何执行 calculateWeightFromCounter()。例如,你可以简单地返回计数器本身,但请注意,最后一个加权数字可能不会接近计数器的总和,因此你可能无法得到你想要的确切属性。(很难说,因为如前所述,你留下了一个相当开放的问题。)


问题在于每个新值的权重都在改变。但在您的情况下,它们没有改变。 - Suzan Cioc
实际使用的权重随着每个新值的变化而改变——“权重”被逐渐较大的数字除以,从而强制实际使用的权重始终总和为1。 - Kaganar
你没有把分子和分母搞混了吧?如果是这样,那么你正在计算普通加权平均值,每个 s_i 都没有改变。 - Suzan Cioc
我肯定做到了。已经修复了 - 分子现在按照应有的方式累积权重*值。 - Kaganar
所以你正在计算正常加权平均。请看我的更新,了解我需要什么--加权配置文件要“随我而动”。 - Suzan Cioc

1

这段内容太长了,无法在评论中发布,但了解这些可能会很有用。

假设你有: w_0*v_n + ... w_n*v_0 (我们将其简称为w[0..n]*v[n..0]

那么下一步是: w_0*v_n1 + ... w_n1*v_0 (我们将其简称为w[0..n1]*v[n1..0]

这意味着我们需要一种方法来从w[0..n]*v[n..0]计算w[1..n1]*v[n..0]

当然,v[n..0]可能是0, ..., 0, z, 0, ..., 0,其中z位于某个位置x。

如果我们没有任何“额外”的存储空间,则f(z*w(x))=z*w(x + 1),其中w(x)是位置x的权重。

重新排列方程式:w(x + 1) = f(z*w(x))/z。对于一个特定的x,w(x + 1)最好是常数,因此f(z*w(x))/z最好是常数。因此,f必须让z传播--也就是说,f(z*w(x)) = z*f(w(x))
但在这里我们又有一个问题。注意,如果z(可以是任何数字)可以通过f传播,那么w(x)肯定也可以。所以f(z*w(x)) = w(x)*f(z)。因此,f(w(x)) = w(x)/f(z)
但是对于一个常数xw(x)是常数,因此f(w(x))也应该是常数。w(x)是常数,因此f(z)最好也是常数,以便w(x)/f(z)是常数。因此,f(w(x)) = w(x)/c,其中c是常数。

所以,当x是一个权重值时,f(x)=c*x,其中c是一个常数。

因此,w(x+1) = c*w(x)

也就是说,每个权重都是前一个的倍数。因此,权重采用w(x)=m*b^x的形式。

请注意,这假设f所拥有的唯一信息是最后聚合的值。请注意,除非您愿意存储表示输入的非常量数据,否则在某些时候您将被缩小到这种情况。您无法用实数表示无限长度的实数向量,但您可以以某种方式在恒定的有限存储量中进行逼近。但这只是一个近似。

虽然我没有严格证明它,但我的结论是,您想要的东西很难以高精度完成,但您可以使用对数(n)空间(对于许多实际应用程序而言,这可能是O(1))来生成高质量的近似值。您甚至可以使用更少的空间。


1
我尝试实际编写一些东西(在Java中)。正如所说,您的目标是无法实现的。您只能从一些最后记住的值中计算平均值。如果您不需要精确,可以近似旧值。我试图通过准确记住最后5个值和仅对5个值进行求和来记忆旧值,记住最后5个总和。然后,复杂度为O(2n),用于记忆最后n + n * n个值。这是一个非常粗略的近似值。
您可以根据需要修改“lastValues”和“lasAggregatedSums”数组大小。请参阅此ASCII艺术图片,尝试显示最后值的图形,显示第一列(旧数据)作为聚合值(不是单独的值),并且仅记住最早的5个值。
values:
            #####
            #####       #####        #
      ##### #####       #####        #  #
      ##### ##### ##### #####       ## ##
      ##### ##### ##### ##### ##### #####
time: --->
挑战 1:我的示例没有考虑权重,但我认为添加“lastAggregatedSums”的权重应该不成问题,只有一个问题,如果你想让旧值具有较低的权重,那么会更困难,因为数组是旋转的,所以不容易知道哪个数组成员对应哪个权重。也许你可以修改算法,始终将数组中的值“移动”而不是旋转?这样添加权重就不成问题了。

挑战 2:这些数组以0值初始化,并且这些值从一开始就计入平均值,即使我们还没有接收到足够的值。如果你运行算法很长时间,可能不会在意它在开始时学习一段时间。如果你在意,可以发表修改意见;-)

public class AverageCounter {
    private float[] lastValues = new float[5];
    private float[] lastAggregatedSums = new float[5];
    private int valIdx = 0;
    private int aggValIdx = 0;
    private float avg;

    public void add(float value) {
        lastValues[valIdx++] = value;
        if(valIdx == lastValues.length) {
            // count average of last values and save into the aggregated array.
            float sum = 0;
            for(float v: lastValues) {sum += v;}
            lastAggregatedSums[aggValIdx++] = sum;
            if(aggValIdx >= lastAggregatedSums.length) {
                // rotate aggregated values index
                aggValIdx = 0;
            }
            valIdx = 0;
        }
        float sum = 0;
        for(float v: lastValues) {sum += v;}
        for(float v: lastAggregatedSums) {sum += v;}
        avg = sum / (lastValues.length + lastAggregatedSums.length * lastValues.length);
    }

    public float getAvg() {
        return avg;
    }
}

1
一种无记忆的解决方案是通过对先前平均值和新值进行加权组合来计算新平均值:
average = (1 - P) * average + P * value

其中P是一个经验常数,0 <= P <= 1

展开得到:

average = sum i (weight[i] * value[i])  

其中value [0]是最新的值,而

weight[i] = P * (1 - P) ^ i

当P值较低时,历史数值会被赋予更高的权重。

P越接近1,它就越快地收敛到较新的数值。

当P=1时,这是一个常规分配且忽略先前的数值。

如果您想要最大化value[N]的贡献,请将其最大化。

weight[N] = P * (1 - P) ^ N  

其中 0 <= P <= 1

我发现当 weight[N] 最大化时

P = 1 / (N + 1)

0

您可以结合不同有效窗口大小(N)的指数平均值(加权和)以获得所需的权重。使用更多的指数平均值来更详细地定义您的权重配置文件。(更多的指数平均值也意味着需要存储和计算更多的值,因此这里需要权衡)


0
在Python 3.11中,我正在使用推导式来高效地计算价格和尺寸的加权平均值,并处理零除法。
prices = [1000, 2000, 3000, 4000]
sizes = [0.5, 1.0, 3.0, 6.0]

def weighted_average(prices, sizes):
    dot_product = sum(price * size for price, size in zip(prices, sizes))
    total_size = sum(sizes)
    return dot_product / total_size if total_size != 0 else 0

result = weighted_average(prices, sizes)
print(result)

网页内容由stack overflow 提供, 点击上面的
可以查看英文原文,
原文链接