Node.js:使用async.each异步处理大量元素会变得过慢

3

我有一台配备4GB内存的系统。我需要按照以下方式处理一组200个文件(平均文件大小=20MB):

  • 从gridfs中读取每个文件
  • 提取一些信息
  • 将信息存储到mongoDB的某个集合中

现在,完成同样任务的代码如下:

async.each(files, function (file, callback){

    console.log("reading file", file._id);

    readstream[file._id] = db.gfs().createReadStream({
        _id: file._id
    });

    readstream[file._id].on('data', function (chunk) {
        part[file._id] = part[file._id] && (part[file._id] + chunk.toString()) || chunk.toString();
    });

    readstream[file._id].on('end', function(){

        //  do something here 

    });

}, function (err){
    if(err){
        console.error("error ", err);
        res.json(err);
    }               
    else{
        console.log("saved all files ############ YIPPIEEEEEEEEEEEEE ###################");
        res.json({"status": 1});
    }
});

它对于10个文件的处理效果非常好。当文件数量很大(我这里有200个)时,由于内存限制,它会变得非常慢。

目前,我可以一次处理10个文件,并且可以接受,因为这是一次性活动。但我想知道在生产环境中解决这种情况的标准做法是什么?


这些文件有多大?如果是内存问题,那么最终您需要查看硬件,扩展而不是升级将是正确的方法。 - James
2
"async.each"会将所有任务并行执行,建议使用"async.eachSeries"代替,它会逐个执行任务。如果您仍需要并行执行,请尝试使用"async.cargo",它基本上是"each"和"eachSeries"的组合。 - Ma'moon Al-Akash
1
也许你可以重构代码,使用 async 函数 eachLimit link 或 parallelLimit? link。这将并行执行指定数量的任务。例如,你可以将其限制为 10,它将运行前 10 个任务,然后每当其中一个完成时,它就会启动另一个任务,直到所有任务都完成。 - RoyHB
@Ma'moonAl-Akash:async.eachSeries解决了问题。谢谢!!! :-) 您可以将其发布为答案,以便我可以关闭此问题。 - Mandeep Singh
我将把这个作为答案发布,以便其他人可以受益,请接受它以表示它有助于解决问题。 - Ma'moon Al-Akash
2个回答

2
问题在于并行执行,因为async.each会并行执行所有任务,可以使用async.eachSeries逐个执行任务作为解决方法。您还可以考虑使用async.cargo来合并多个任务的执行。请注意保留HTML标签。

仅供记录,虽然它比async.each表现得更好,但当文件数量和每个文件的大小很大时(大约100个文件每个30 MB),即使这样也没有帮助。 - Mandeep Singh
尝试使用 async.cargo 并分享您的体验。 - Ma'moon Al-Akash
我的错,现在它完美地运行了。还有一个嵌套的async.each也会导致问题。现在将其更改为async.eachSeries。自从1小时以来一切都很顺利。 - Mandeep Singh

0
你也可以使用async.eachLimit。这里的limit是同时进行异步操作的最大数量。这将限制异步运行N个任务并行。

Async文档:

与each相同,但每次最多运行limit个异步操作。

示例(未经测试,但这是一次处理10个文件的想法):

filesContent = []
async.eachLimit(fileNamesArray, 10, function(fileName, callback) {
    // Process a file
    processFile(fileName, (err, content) => {
        filesContent.push(content);
        callback(err);
    });
}, function(err) {
    // if any of tasks produced an error, err would equal that error
    if( err ) {
      console.log('A file failed to process');
    } else {
      console.log('All files have been processed successfully');
    }
});

网页内容由stack overflow 提供, 点击上面的
可以查看英文原文,
原文链接