使用MongoDB聚合框架进行移动平均?

11

如果你有50年的温度天气数据(每日记录)(例如),你如何计算该时间段内使用三个月间隔的移动平均值?你可以使用一次查询完成还是必须要多次查询?

Example Data

01/01/2014 = 40 degrees
12/31/2013 = 38 degrees
12/30/2013 = 29 degrees
12/29/2013 = 31 degrees
12/28/2013 = 34 degrees
12/27/2013 = 36 degrees
12/26/2013 = 38 degrees
.....

1
你的意思是什么?你想让某些值重叠吗?如果是这样,是哪些值?按天计算?还是只是滚动平均值。聚合框架实际上无法将一个文档与另一个文档进行比较,所以这听起来更像是mapReduce。 - Neil Lunn
@neil-lunn 我想计算滚动平均值...所以对于3个月的间隔,我想要取一天并将该天与过去3个月的数据进行平均,然后对接下来的50年每天都进行这样的平均。因此,我认为某些值会重叠以进行平均。如果无法使用聚合框架,则如何使用mapReduce来完成此操作。我认为你是对的,我必须比较单独的文档。谢谢! - mc.
@neil-lunn,看起来聚合框架确实无法做到这一点,你是对的... https://jira.mongodb.org/browse/SERVER-4437 .. 如果你有任何关于如何使用MongoDB的MapReduce来完成它的想法,请告诉我。 - mc.
我还想创建这个移动或滚动平均数据数组 - 关于移动平均的更多信息请参见https://en.wikipedia.org/wiki/Moving_average - mc.
6个回答

9

现在,agg框架已经内置了$map$reduce$range,因此数组处理变得更加简单。以下是一个示例,用于计算一组数据的移动平均值,您希望根据某些谓词进行过滤。基本设置是每个文档包含可过滤的条件和一个值,例如:

{sym: "A", d: ISODate("2018-01-01"), val: 10}
{sym: "A", d: ISODate("2018-01-02"), val: 30}

这是它:

// This controls the number of observations in the moving average:
days = 4;

c=db.foo.aggregate([

// Filter down to what you want.  This can be anything or nothing at all.
{$match: {"sym": "S1"}}

// Ensure dates are going earliest to latest:
,{$sort: {d:1}}

// Turn docs into a single doc with a big vector of observations, e.g.
//     {sym: "A", d: d1, val: 10}
//     {sym: "A", d: d2, val: 11}
//     {sym: "A", d: d3, val: 13}
// becomes
//     {_id: "A", prx: [ {v:10,d:d1}, {v:11,d:d2},  {v:13,d:d3} ] }
//
// This will set us up to take advantage of array processing functions!
,{$group: {_id: "$sym", prx: {$push: {v:"$val",d:"$date"}} }}

// Nice additional info.  Note use of dot notation on array to get
// just scalar date at elem 0, not the object {v:val,d:date}:
,{$addFields: {numDays: days, startDate: {$arrayElemAt: [ "$prx.d", 0 ]}} }

// The Juice!  Assume we have a variable "days" which is the desired number
// of days of moving average.
// The complex expression below does this in python pseudocode:
//
// for z in range(0, size of value vector - # of days in moving avg):
//    seg = vector[n:n+days]
//    values = seg.v
//    dates = seg.d
//    for v in seg:
//        tot += v
//    avg = tot/len(seg)
// 
// Note that it is possible to overrun the segment at the end of the "walk"
// along the vector, i.e. not enough date-values.  So we only run the
// vector to (len(vector) - (days-1).
// Also, for extra info, we also add the number of days *actually* used in the
// calculation AND the as-of date which is the tail date of the segment!
//
// Again we take advantage of dot notation to turn the vector of
// object {v:val, d:date} into two vectors of simple scalars [v1,v2,...]
// and [d1,d2,...] with $prx.v and $prx.d
//
,{$addFields: {"prx": {$map: {
    input: {$range:[0,{$subtract:[{$size:"$prx"}, (days-1)]}]} ,
    as: "z",
    in: {
       avg: {$avg: {$slice: [ "$prx.v", "$$z", days ] } },
       d: {$arrayElemAt: [ "$prx.d", {$add: ["$$z", (days-1)] } ]}
        }
        }}
    }}

            ]);

这可能会产生以下输出:
{
    "_id" : "S1",
    "prx" : [
        {
            "avg" : 11.738793632512115,
            "d" : ISODate("2018-09-05T16:10:30.259Z")
        },
        {
            "avg" : 12.420766702631376,
            "d" : ISODate("2018-09-06T16:10:30.259Z")
        },
        ...

    ],
    "numDays" : 4,
    "startDate" : ISODate("2018-09-02T16:10:30.259Z")
}

这个解决方案能处理大数据集吗?(比如说 1000 万份文件)? - Jayesh Singh
@JayeshSingh 取决于 $group。如果您考虑创建一个包含 20m 个 val:date 对的数组将超过 16m 的文档限制,那么您是正确的。 - Buzz Moschetti

5

在MongoDB中,我倾向于在每个文档中维护过去90天的运行总和,例如每天的值:

{"day": 1, "tempMax": 40, "tempMaxSum90": 2232}
{"day": 2, "tempMax": 38, "tempMaxSum90": 2230}
{"day": 3, "tempMax": 36, "tempMaxSum90": 2231}
{"day": 4, "tempMax": 37, "tempMaxSum90": 2233}

每当需要将新的数据点添加到集合中时,您可以通过以下两个简单的查询(一个加法和一个减法)有效地计算下一个总和,而不是读取和求和90个值。如下所示(伪代码):
tempMaxSum90(day) = tempMaxSum90(day-1) + tempMax(day) - tempMax(day-90)

每天的90天移动平均值仅是90天总和除以90。
如果您想提供不同时间尺度的移动平均线(例如,1周、30天、90天、1年),则可以使用每个文档一个总和数组而不是单个总和来维护。这种方法会增加存储空间和插入新数据的处理成本,但适用于大多数时间序列图表场景,其中新数据收集相对缓慢且需要快速检索。

2

从Mongo 5开始,这是新的$setWindowFields聚合操作符的完美使用案例:

请注意,为了简单起见,我考虑滚动平均值具有3天的窗口(今天和前两天):

// { date: ISODate("2013-12-26"), temp: 38 }
// { date: ISODate("2013-12-27"), temp: 36 }
// { date: ISODate("2013-12-28"), temp: 34 }
// { date: ISODate("2013-12-29"), temp: 31 }
// { date: ISODate("2013-12-30"), temp: 29 }
// { date: ISODate("2013-12-31"), temp: 38 }
// { date: ISODate("2014-01-01"), temp: 40 }
db.collection.aggregate([
  { $setWindowFields: {
    sortBy: { date: 1 },
    output: {
      movingAverage: {
        $avg: "$temp",
        window: { range: [-2, "current"], unit: "day" }
      }
    }
  }}
])
// { date: ISODate("2013-12-26"), temp: 38, movingAverage: 38 }
// { date: ISODate("2013-12-27"), temp: 36, movingAverage: 37 }
// { date: ISODate("2013-12-28"), temp: 34, movingAverage: 36 }
// { date: ISODate("2013-12-29"), temp: 31, movingAverage: 33.67 }
// { date: ISODate("2013-12-30"), temp: 29, movingAverage: 31.33 }
// { date: ISODate("2013-12-31"), temp: 38, movingAverage: 32.67 }
// { date: ISODate("2014-01-01"), temp: 40, movingAverage: 35.67 }

这个功能:

  • 按时间顺序排序文档:sortBy: { date: 1 }
  • 为每个文档创建一个文档范围(window),其中:
    • 包括"current"文档和前面所有在"2"-"day"窗口内的文档
  • 在该窗口内,计算平均温度:$avg: "$temp"

2

接受的答案对我有所帮助,但是我花了一些时间才理解它的工作原理,因此我想解释一下我的方法以帮助其他人。特别是在您的情况下,我认为我的答案会有所帮助。

这适用于较小的数据集

首先按天分组数据,然后将每天的所有日期附加到一个数组中:

{
  "$sort": {
    "Date": -1
  }
},
{
  "$group": {
    "_id": {
      "Day": "$Date",
      "Temperature": "$Temperature"
    },
    "Previous Values": {
      "$push": {
        "Date": "$Date",
        "Temperature": "$Temperature"
      }
    }
  }

这将为您留下一个看起来像这样的记录(它将被正确排序):
{"_id.Day": "2017-02-01", 
"Temperature": 40, 
"Previous Values": [
    {"Day": "2017-03-01", "Temperature": 20},
    {"Day": "2017-02-11", "Temperature": 22},
    {"Day": "2017-01-18", "Temperature": 03},
    ...
    ]},

现在每天都添加了所有日期,我们需要从“Previous Values”数组中删除比this _id.Day字段更近的项目,因为移动平均是向后看的:

{
  "$project": {
    "_id": 0,
    "Date": "$_id.Date",
    "Temperature": "$_id.Temperature",
    "Previous Values": 1
  }
},
{
  "$project": {
    "_id": 0,
    "Date": 1,
    "Temperature": 1,
    "Previous Values": {
      "$filter": {
        "input": "$Previous Values",
        "as": "pv",
        "cond": {
          "$lte": ["$$pv.Date", "$Date"]
        }
      }
    }
  }
},

先前值数组中的每个项目仅包含小于或等于每个记录日期的日期:

{"Day": "2017-02-01", 
"Temperature": 40, 
"Previous Values": [
    {"Day": "2017-01-31", "Temperature": 33},
    {"Day": "2017-01-30", "Temperature": 36},
    {"Day": "2017-01-29", "Temperature": 33},
    {"Day": "2017-01-28", "Temperature": 32},
    ...
    ]}

现在我们可以选择平均窗口大小,因为数据是按天计算的,对于一周,我们将取数组的前7个记录;对于每月,取30个记录;或者对于每3个月,取90天的记录:

{
  "$project": {
    "_id": 0,
    "Date": 1,
    "Temperature": 1,
    "Previous Values": {
      "$slice": ["$Previous Values", 0, 90]
    }
  }
},

为了对先前的温度进行平均,我们需要展开“Previous Values”数组,然后按日期字段进行分组。展开操作会执行以下步骤:
{"Day": "2017-02-01", 
"Temperature": 40, 
"Previous Values": {
        "Day": "2017-01-31", 
        "Temperature": 33}
},

{"Day": "2017-02-01", 
"Temperature": 40, 
"Previous Values": {
        "Day": "2017-01-30", 
        "Temperature": 36}
},

{"Day": "2017-02-01", 
"Temperature": 40, 
"Previous Values": {
        "Day": "2017-01-29", 
        "Temperature": 33}
},
...

注意看到 Day 字段是相同的,但现在我们有了 Previous Values 数组中每个之前日期的文档。现在我们可以按天分组,然后对 Previous Values.Temperature 取平均值以获得移动平均值:

{"$group": {
    "_id": {
      "Day": "$Date",
      "Temperature": "$Temperature"
    },
    "3 Month Moving Average": {
      "$avg": "$Previous Values.Temperature"
    }
  }
}

没错!我知道将每个记录与每个记录连接起来并不理想,但对于较小的数据集来说,这样做效果还不错。

0

我想我可能已经有了自己的问题的答案。Map Reduce 可以做到这一点。首先使用 emit 将每个文档映射到它应该与之平均的邻居,然后使用 reduce 平均每个数组... 并且那个新的平均数组应该是随时间变化的移动平均线,因为它的 ID 将是您关心的新日期间隔

我想我需要更好地理解 map-reduce...

:)

例如... 如果我们想在内存中执行它(稍后我们可以创建集合)

GIST https://gist.github.com/mrgcohen/3f67c597a397132c46f7

看起来对吗?


好的,我做了一些微调,但我相信整体思路是正确的。你可能需要根据自己的需求微调时间间隔,但它应该可以工作。 - mc.
问题是,当处理大型数据集时,这个程序运行速度会有多快,听起来MongoDB除非开始分片,否则速度会很慢...这里的最佳实践是什么?请帮忙。 - mc.

-1

我不相信集合框架在当前版本(2.6)下能够针对多个日期完成此操作,或者至少不能在没有一些严重的技巧的情况下完成。原因是集合管道一次只处理一个文档,因此必须以某种方式创建包含前三个月相关信息的每天的文档。这将作为一个$group阶段,计算平均值,意味着前一阶段将产生每天记录的约90份副本,并带有一些可用于$group的区分键。

所以我不认为可以通过单个聚合来同时处理多个日期。如果有人找到了一种方法,即使它非常复杂而不实用,我会很高兴地修改/删除这个答案。PostgreSQL的PARTITION类型函数可以在这里完成工作;也许将来会添加该函数。


那么你就必须查询这些数据的一部分,并在某种语言(如Ruby、Python、Node)中计算移动平均值,或者为每个时间间隔运行聚合查询...这真的是最好的解决方案吗?这不感觉很奇怪吗?有没有更好的使用Map-Reduce的方法,我想不到呢? - mc.
我还没有考虑过Map-Reduce。通常情况下,我会尽量避免使用Map-Reduce,因为它有显著的性能惩罚,并且不太安全,因为你需要在服务器端运行自定义代码。我会尝试思考一下这个问题,或者也许其他人会提出一个M/R解决方案。 - wdberkeley
你肯定应该能够做M/R,但我现在没有时间去解决它。我会尽力去处理并更新我的答案。 - wdberkeley
自2016年12月v3.4版本以来,这已不再是一个问题;请参见上面的$map/$range示例。 - Buzz Moschetti

网页内容由stack overflow 提供, 点击上面的
可以查看英文原文,
原文链接