MongoDB唯一值聚合通过Map Reduce

4

我看到stackoverflow上有很多关于MongoDB聚合的问题,但是我还没有找到完整的解决方案。

这里是我的数据示例:

{
    "fruits" : {
        "apple" : "red",
        "orange" : "orange",
        "plum" : "purple"
    }
}
{
    "fruits" : {
        "apple" : "green",
        "plum" : "purple"
    }
}
{
    "fruits" : {
        "apple" : "red",
        "orange" : "yellow",
        "plum" : "purple"
    }
}

现在,我的目标是确定每种水果的每种颜色的受欢迎程度,因此输出集合可能如下所示:
{
    "_id" : "apple"
    "values" : {
        "red" : 2,
        "green" : 1
    }
}
{
    "_id" : "orange"
    "values" : {
        "orange" : 1,
        "yellow" : 1
    }
}
{
    "_id" : "plum"
    "values" : {
        "purple" : 3
    }
}

我尝试了各种M/R函数,但最终它们要么不起作用,要么花费指数级的时间。在这个例子(水果)的背景下,我有大约1,000种不同的水果和100,000种颜色,总共约有10,000,000个文档。我的当前工作M/R如下:

map = function() {
    if (!this.fruits) return;
    for (var fruit in this.fruits) {
        emit(fruit, {
            val_array: [
                {value: this.fruits[fruit], count: 1}
            ]
        });
    }
};

reduce = function(key, values) {
    var collection = {
        val_array: []
    };
    var found = false;
    values.forEach(function(map_obj) {
        map_obj.val_array.forEach(function(value_obj) {
            found = false;
            // if exists in collection, inc, else add
            collection.val_array.forEach(function(coll_obj) {
                if (coll_obj.value == value_obj.value) {
                    // the collection already has this object, increment it
                    coll_obj.count += value_obj.count;
                    found = true;
                    return;
                }
            });
            if (!found) {
                // the collection doesn't have this obj yet, push it
                collection.val_array.push(value_obj);
            }
        });
    });
    return collection;
};

现在,这确实起作用了,对于100条记录,只需要大约一秒钟左右的时间,但是时间增长是非线性的,因此100M条记录需要很长时间。问题在于我在reduce函数中使用“collection”数组进行了一种简易的子聚合,因此需要迭代“collection”和我的map函数的值。现在我只需要找出如何高效地做到这一点(即使需要多次缩减)。欢迎任何建议!
编辑 由于没有更好的发布位置,这是我的解决方案。 首先,我创建了一个名为“mr.js”的文件:
map = function() {
    if (!this.fruits) return;
    var skip_fruits = {
        'Watermelon':1,
        'Grapefruit':1,
        'Tomato':1 // yes, a tomato is a fruit
    }
    for (var fruit in this.fruits) {
        if (skip_fruits[fruit]) continue;
        var obj = {};
        obj[this.fruits[fruit]] = 1;
        emit(fruit, obj);
    }
};

reduce = function(key, values) {
    var out_values = {};
    values.forEach(function(v) {
        for(var k in v) { // iterate values
            if (!out_values[k]) {
                out_values[k] = v[k]; // init missing counter
            } else {
                out_values[k] += v[k];
            }
        }
    });
    return out_values;
};

var in_coll = "fruit_repo";
var out_coll = "fruit_agg_so";
var total_docs = db[in_coll].count();
var page_size = 100000;
var pages = Math.floor(total_docs / page_size);
print('Starting incremental MR job with '+pages+' pages');
db[out_coll].drop();
for (var i=0; i<pages; i++) {
    var skip = page_size * i;
    print("Calculating page limits for "+skip+" - "+(skip+page_size-1)+"...");
    var start_date = db[in_coll].find({},{date:1}).sort({date:1}).skip(skip).limit(1)[0].date;
    var end_date = db[in_coll].find({},{date:1}).sort({date:1}).skip(skip+page_size-1).limit(1)[0].date;
    var mr_command = {
        mapreduce: in_coll,
        map: map,
        reduce: reduce,
        out: {reduce: out_coll},
        sort: {date: 1},
        query: {
            date: {
                $gte: start_date,
                $lt: end_date
            }
        },
        limit: (page_size - 1)
    };
    print("Running mapreduce for "+skip+" - "+(skip+page_size-1));
    db[in_coll].runCommand(mr_command);
}

那个文件迭代遍历整个集合,每次递增地对10万篇文档进行映射/归约操作(按date排序,必须有索引!),并将它们减少到单个输出集。使用方法如下:mongo db_name mr.js
然后,几个小时后,我得到了一个包含所有信息的集合。为了找出哪些水果有最多的颜色,我在mongo shell中使用以下命令打印前25个结果:
// Show number of number of possible values per key
var keys = [];
for (var c = db.fruit_agg_so.find(); c.hasNext();) {
    var obj = c.next();
    if (!obj.value) break;
    var len=0;for(var l in obj.value){len++;}
    keys.push({key: obj['_id'], value: len});
}
keys.sort(function(a, b){
    if (a.value == b.value) return 0;
    return (a.value > b.value)? -1: 1;
});
for (var i=0; i<20; i++) {
    print(keys[i].key+':'+keys[i].value);
}

这种方法的真正酷处在于它是增量式的,因此我可以在mapreduce正在运行时处理输出数据。
2个回答

8

看起来你并不真正需要 val_array。为什么不使用一个简单的哈希表呢?试试这个:

map = function() {
    if (!this.fruits) return;
    for (var fruit in this.fruits) {
        emit(fruit, 
             {this.fruits[fruit]: 1});
    }
};

reduce = function(key, values) {
  var colors = {};

  values.forEach(function(v) {
    for(var k in v) { // iterate colors
      if(!colors[k]) // init missing counter
        colors[k] = 0

      color[k] += v[k];
    }
  });

  return colors;
}

哇,我真的想太多了,不是吗!这确实完全符合我的要求。我测试了100、1,000和100,000条记录,每组运行速度约为20k/sec(在这些大小下显然是线性的)。我现在正在运行完整的10M记录,我可以看到随着映射数据批次变大,将它们缩小需要的时间明显更长(colors对象必须增长):“secs_running”:488,“msg”:“m/r:(1/3)发出阶段383999/10752083 3%”。 - SteveK
顺便说一下,我无法使用 emit(fruit, {this.fruits[fruit]: 1});,因为键是动态生成的,所以我使用了这个 JS hack:var obj = {}; obj[this.fruits[fruit]] = 1; emit(fruit, obj); - SteveK
我建议尝试分批处理。也就是说,将文档分批处理,每批处理100k(或其他数量),然后在一个最终的任务中进行合并。这可能有些棘手,所以如果只是一次性的需求,我不会费心去实现它。 :) - Sergio Tulentsev
@SteveK:那不是黑客攻击。 :) - Sergio Tulentsev
坏消息,就像我们怀疑的那样,我的数据太大了,无法使用这个M/R进行处理。作业已经运行了几个小时,如果剩下的工作是线性的话,预计完成时间是2053年4月24日星期四08:53:10 :P。看起来我可以高效地处理100k批次,所以我想我会选择这种方式!我想我需要将数据M/R到不同的集合中,然后编写一个脚本来组合结果,或者我会分别对每种不同的水果进行M/R。感谢您的帮助! - SteveK

0

很抱歉告诉您,但 MongoDB MapReduce 框架非常缓慢,并且在“相当长的一段时间内”可能仍然如此(我不认为改进在他们的路线图上)。

简单地说,我的答案是我不会使用 Mongo-MapReduce 进行该操作,而是专注于利用 The New Aggregation Framework 进行实现: http://docs.mongodb.org/manual/reference/aggregation/

或在其之上运行 Hadoop: http://www.slideshare.net/spf13/mongodb-and-hadoop(简单易懂的介绍)

我在使用实现的 MapReduce 功能时也遇到了 MongoDB 缓慢的问题,我的结论是,即使在执行最简单的任务时,它的性能也无法与上述两种解决方案相比。使用新的聚合框架可以轻松处理每秒>1M 的文档(甚至更多),并且只需普通硬件即可。


网页内容由stack overflow 提供, 点击上面的
可以查看英文原文,
原文链接