MongoDB对30亿个文档进行不重复聚合

4

我是一名有用的助手,可以为您进行文本翻译。

我有一个包含30亿个文档的庞大集合。每个文档的格式如下:

"_id" : ObjectId("54c1a013715faf2cc0047c77"),
"service_type" : "JE",
"receiver_id" : NumberLong("865438083645"),
"time" : ISODate("2012-12-05T23:07:36Z"),
"duration" : 24,
"service_description" : "NQ",
"receiver_cell_id" : null,
"location_id" : "658_55525",
"caller_id" : NumberLong("475035504705")

我想获取不同用户列表(至少作为“caller_id”出现一次的用户),他们的计数(每个用户作为呼叫者或接收者出现的次数)以及如果他们是呼叫者,则位置的计数(即每个用户的每个location_id的计数)。
我希望最终得到以下结果:
"number_of_records" : 20,
"locations" : [{location_id: 658_55525, count:5}, {location_id: 840_5425, count:15}],
"user" : NumberLong("475035504705")

我尝试了这里这里描述的解决方案,但它们不够高效(非常慢)。有什么更有效的方法来实现这个目标?


当你说“非常慢”时,你指的是多慢?你让它运行了多长时间? - Kyle Booth
2
你能否请发布一下你尝试过的两种解决方案以及哪一种更好一些呢?这将会很有用,因为你可能不希望得到与你已经尝试过的相同的答案。 - BatScream
1
目前还没有足够的信息来猜测你的资源限制可能会如何影响性能。你使用了什么样的部署方式(独立/复制品/分片)和服务器资源(内存/硬盘/CPU),以及数据的总大小是多少?你能否也包括这个集合的索引?最后,你使用的MongoDB和操作系统的具体版本是什么? - Stennie
1
也很有用的是知道您是在寻找一次性解决方案还是这是一个持续的查询需求。 - Stennie
部署是独立的,使用32GB内存的Ubuntu服务器。Mongo版本为2.6.7。数据库大小为599.661GB,并且我在caller_id、receiver_id、time和location_id上建立了索引。该解决方案是一次性的。 - amaatouq
我非常惊讶你在24小时的运行时间内还没有找到解决方案。 - user3467349
2个回答

2

使用聚合来获得您的结果:

db.<collection>.aggregate([
   { $group : { _id : { user:  "$caller_id", localtion: '$location_id'} , count : { $sum : 1}  } },
   { $project : { _id : 0, _id : '$_id.user', location : '$_id.localtion', count : '$count' } },
   { $group : { _id : '$_id', 'locations' : { $push : { location_id : '$location', count : '$count' } }, number_of_records : {$sum : '$count'} } },
   { $project : { _id : 0, user : '$_id', locations : '$locations', number_of_records : '$number_of_records'} },
   { $out : 'outputCollection'},
])

输出结果将会是:
{
    "0" : {
        "locations" : [ 
            {
                "location_id" : "840_5425",
                "count" : 8
            }, 
            {
                "location_id" : "658_55525",
                "count" : 5
            }
        ],
        "number_of_records" : 13,
        "user" : NumberLong(475035504705)
    }
}

使用 allowDiskUse 更新:

var pipe = [
   { $group : { _id : { user:  "$caller_id", localtion: '$location_id'} , count : { $sum : 1}  } },
   { $project : { _id : 0, _id : '$_id.user', location : '$_id.localtion', count : '$count' } },
   { $group : { _id : '$_id', 'locations' : { $push : { location_id : '$location', count : '$count' } }, number_of_records : {$sum : '$count'} } },
   { $project : { _id : 0, user : '$_id', locations : '$locations', number_of_records : '$number_of_records'} },
   { $out : 'outputCollection'},
];

db.runCommand(
   { aggregate: "collection",
     pipeline: pipe,
     allowDiskUse: true
   }
)

我想要一个集合,其中所有用户都具有指定的输出。你的解决方案只适用于一个用户(即475035504705),对吗? - amaatouq
删除$match代码行。但是没有数据库可以在3B条目的情况下快速执行此查询。 - Disposer
我会尝试这个...如果需要几天时间也没关系!因为这只是一次性的事情! - amaatouq
我收到了以下错误信息:"errmsg" : "exception: Exceeded memory limit for $group, but didn't allow external sort. Pass allowDiskUse:true to opt in.", "code" : 16945, "ok" : 0 - amaatouq
你不应该需要两次分组和投影。 - user3467349
显示剩余3条评论

1
一个map-reduce方案比一个aggregation管道更适合这里,因为它避免了两个unwind。如果你能提出一个只有一个unwind的聚合方案,那就太好了。但下面的map-reduce方案是一种方法,尽管你需要测量其对大数据的运行时间,并看看它是否适合你。 map函数:
var map = function(){
    emit(this.caller_id,
        {locs:[{"location_id":this.location_id,"count":1}]});
}

reduce 函数:

var reduce = function(key,values){
    var result = {locs:[]};
    var locations = {};
    values.forEach(function(value){
        value.locs.forEach(function(loc){
                if(!locations[loc.location_id]){
                    locations[loc.location_id] = loc.count;
                }
                else{
                    locations[loc.location_id]++;
                }
        })
    })
    Object.keys(locations).forEach(function(k){
        result.locs.push({"location_id":k,"count":locations[k]});
    })
    return result;
}

finalize函数:

var finalize = function(key,value){
    var total = 0;
    value.locs.forEach(function(loc){
        total += loc.count;
    })
    return {"total":total,"locs":value.locs};
}

调用 map-reduce:
db.collection.mapReduce(map,reduce,{"out":"t1","finalize":finalize});

在 Map-Reduce 生成输出后对结果进行汇总。

db.t1.aggregate([
{$project:{"_id":0,
           "number_of_records":"$value.total",
           "locations":"$value.locs","user":"$_id"}}
])

示例输出:

{
        "number_of_records" : 3,
        "locations" : [
                {
                        "location_id" : "658_55525",
                        "count" : 1
                },
                {
                        "location_id" : "658_55525213",
                        "count" : 2
                }
        ],
        "user" : 2
}
{
        "number_of_records" : 1,
        "locations" : [
                {
                        "location_id" : "658_55525",
                        "count" : 1
                }
        ],
        "user" : NumberLong("475035504705")
}

地图-减少的Java脚本代码应该是自解释的。

感谢@BatScream的回答。然而,我正在尝试这段代码已经运行了5天,仍在运行中。该方法似乎非常缓慢! - amaatouq

网页内容由stack overflow 提供, 点击上面的
可以查看英文原文,
原文链接