使用MongoDB聚合操作实现从事件集合中创建线性漏斗,这是否可行?

3
我有许多事件文档,每个事件都有若干字段,但与我的查询相关的字段是:
- person_id - 触发事件的人员的引用 - event - 用于识别事件的字符串键 - occurred_at - 事件发生的UTC时间
我的目标是:
- 对于事件键列表(例如 `['event_1','event_2', 'event_3']`),获取执行每个事件以及之前所有事件的人数计数,按顺序排列。即: - 执行 event_1 的人数 - 执行 event_1 后,再执行 event_2 的人数 - 执行 event_1、event_2 再执行 event_3 的人数 - 等等 - 次要目标是能够获取每个事件的平均 occurred_at 日期,以便我可以计算每个事件之间的平均时间差。
我最好的解决方案是以下两个映射规约:
db.events.mapReduce(function () {
  emit(this.person_id, {
    e: [{
      e: this.event,
      o: this.occurred_at
    }]
  })
}, function (key, values) {
  return {
    e: [].concat.apply([], values.map(function (x) {
      return x.e
    }))
  }
}, {
  query: {
    account_id: ObjectId('52011239b1b9229f92000003'),
    event: {
      $in: ['event_a', 'event_b', 'event_c','event_d','event_e','event_f']
    }
  },
  out: 'people_funnel_chains',
  sort: { person_id: 1, occurred_at: 1 }
})

并且:
db.people_funnel_chains.mapReduce(function() {
  funnel = ['event_a', 'event_b', 'event_c','event_d','event_e','event_f']
  events = this.value.e;
  for (var e in funnel) {
    e = funnel[e];
    if ((i = events.map(function (x) {
      return x.e
    }).indexOf(e)) > -1) {
      emit(e, { c: 1, o: events[i].o })
      events = events.slice(i + 1, events.length);
    } else {
      break;
    }
  }
}, function(key,values) {
    return {
        c: Array.sum(values.map(function(x) { return x.c })),
        o: new Date(Array.sum(values.map(function(x) { return x.o.getTime() }))/values.length)
    };
}, { out: {inline: 1} })

我希望能够通过聚合框架实时实现这个目标,但是我找不到任何方法来做到。对于数万条记录,这需要花费10几秒的时间,虽然我可以以增量方式运行它,这意味着对于新数据来说足够快,但是如果我想修改原始查询(如更改事件链),则无法在单个请求中完成,我希望它能够完成。
使用Cursor.forEach(),我成功地在这方面取得了巨大的进展(基本上消除了第一个映射减少的要求)。
var time = new Date().getTime(), funnel_event_keys = ['event_a', 'event_b', 'event_c','event_d','event_e','event_f'], looking_for_i = 0, looking_for = funnel_event_keys[0], funnel = {}, last_person_id = null;
for (var i in funnel_event_keys) { funnel[funnel_event_keys[i]] = [0,null] };
db.events.find({
  account_id: ObjectId('52011239b1b9229f92000003'),
  event: {
    $in: funnel_event_keys
  }
}, { person_id: 1, event: 1, occurred_at: 1 }).sort({ person_id: 1, occurred_at: 1 }).forEach(function(e) {

  var current_person_id = e['person_id'].str; 

  if (last_person_id != current_person_id) {
    looking_for_i = 0;
    looking_for = funnel_event_keys[0]
  }

  if (e['event'] == looking_for) {
    var funnel_event = funnel[looking_for]
    funnel_event[0] = funnel_event[0] + 1;
    funnel_event[1] = ((funnel_event[1] || e['occurred_at'].getTime()) + e['occurred_at'].getTime())/2;
    looking_for_i = looking_for_i + 1;
    looking_for = funnel_event_keys[looking_for_i]
  }

  last_person_id = current_person_id;
})
funnel;
new Date().getTime() - time;

我想知道是否有一些自定义的内存数据处理方式可以改善这个问题?从MongoDB中获取数十万条记录并将其加载到内存中(在不同的计算机上)会成为瓶颈,是否有我不知道的技术可以解决这个问题?


这是聚合框架无法帮助的少数情况之一,说实话,我甚至无法看到这种情况实时发生,对于数据库来说,要高效地完成这项任务确实很困难。 - Sammaye
这个问题可能最好通过实现一些定制化的东西(例如自己的内存分析引擎)来解决,或者寻找其他数据库选项。 - WiredPrairie
我目前正在使用的解决方案是按person_id和occurred_at(带索引)对事件进行排序,然后使用Cursor.forEach()进行迭代。在我的MPB上,它可以在大约4秒钟内将25k人的362k个事件减少,比上面列出的map reduce快50多秒。我想知道是否有一些定制的东西能够改进这一点,因为从MongoDB中获取数十万条记录到内存中始终会成为瓶颈,是否有我不知道的技术可以做到这一点? - msaspence
每个人是否都会多次发生每个事件?当你说“按顺序”时,你只对第一个事件感兴趣吗?我认为这可以使用聚合框架来完成,实际上,至少有一部分/大部分可以 - 你可能需要两个聚合框架查询... - Asya Kamsky
@msaspence 看起来这个问题非常简单,只需要一个聚合管道就可以了。但是我不太确定你所说的平均时间是什么意思——我可以展示一个计算行动之间平均时间的例子。 - Asya Kamsky
顺便说一下,我在 40,000 篇文档集合上运行了我的示例,大约需要 200 毫秒。 - Asya Kamsky
1个回答

3
我在我的MongoDB博客上写了一篇完整的答案, 总结一下,你需要根据你关心的行动来制定计划,将行动字段的值映射到适当的键名中,按人分组聚合三个行动的执行时间(可选的是执行次数),然后投影新字段,检查是否在行动1之后完成了行动2,以及是否在行动2之后完成了行动3... 最后一阶段只需总结出只做了1个人的数量,或者先做1再做2的人数,或者先做1再做2再做3的人数。

使用一个函数生成聚合管道,可以基于传入的行动数组生成结果。

在我的测试用例中,对于40,000个文档的集合,整个管道运行时间不到200ms(这是在我的小笔记本电脑上)。

正如正确指出的那样,我描述的通用解决方案假设虽然演员可以多次执行任何操作,但他们只能从行动1进展到行动2,而不能直接从行动1跳过行动2而进行行动3(将行动顺序解释为描述先决条件的地方,您不能在完成行动2之前完成行动3)。

事实证明,即使在顺序完全任意的事件序列中,聚合框架仍然可以用于确定在某一时刻有多少人执行了序列动作1、动作2和动作3。
对原始答案进行的主要调整是在中间添加了额外的两个阶段步骤。此步骤展开了按人收集的文档,重新分组并找到第一个出现在第一个动作之后的第二个动作的发生时间。
一旦我们拥有了这个,最终比较就是对于动作1,其后是动作2的最早发生时间,并将其与动作3的最新发生时间进行比较。
它可能被概括为处理任意数量的事件,但每增加两个事件,聚合就会增加两个阶段。
这里是我的管道修改说明,以达到您所寻求的答案。

请纠正我,如果我错了,但是这个解决方案只有在每个事件的第一个实例按照您想要的顺序时才能起作用?例如,如果您正在寻找序列e1,e2,e3,并且某人已触发以下序列:e1,e3,e2,e3,它将仅被注册为他们已经到达。如果您可以假设在e2之前不能触发e3,那么这很好,不幸的是我们无法做出这种假设。 - msaspence
对我来说不清楚每个操作是否可以多次执行,以及您是否关心每个操作按严格顺序执行 - 最初我为每个操作都记录了第一次和最后一次使用时间以进行更复杂的比较。 - Asya Kamsky
如果您明确您的确切要求,我可以解释如何调整算法。如果您保留第一个和最后一个时间,您可以将动作的第一个时间与下一个动作的最后一个时间进行比较 - 您是否认为像e2、e1、e3、e2、e1、e2这样的序列满足您的要求?而e1、e3、e2、e3、e1、e3、e2、e1、e3呢?管道会稍微复杂一些,但仍然可行(尽管它对于传递的任意数量的动作序列的泛化程度会降低 - 当前变体适用于大于或等于一个动作的任意数量)。 - Asya Kamsky
顺便提一下,你所举的 e1、e3、e2、e3 的例子只有在没有“乱序执行”的概念时才应该考虑。如果你详细说明这些操作代表什么,可能会有所帮助——我之前应用这种技巧的具体示例特别关注“另一个操作之前的最后一个操作”类型的序列,而这似乎不是你的情况。 - Asya Kamsky
好的,但我们都同意只需要两个事件就可以轻松完成,对吧?(检查最早的事件1是否在最新的事件2之前?)我认为我可以在聚合框架中处理三个事件,但无法处理任意数量的事件。 - Asya Kamsky
显示剩余2条评论

网页内容由stack overflow 提供, 点击上面的
可以查看英文原文,
原文链接