控制JavaScript异步流程的速率(在循环中)

5
假设您想在短代码中为列表中的每个文件夹启动一个(随机)进程:
var exec = require('child_process').exec;
var folders = [...]; // a list from somewhere

_.each(folders, function(folder) {
    exec("tar cvf " + folder + ".tgz " + folder);
});

如果列表很长,我可能会同时运行大量进程,需要避免这种情况。有没有一种相当简单的方法来控制执行速率(这里最多只能并发处理5个进程)?
编辑:该问题适用于所有类型的异步流程(您希望在其中控制速率),而不仅仅是 exec-over-folders 问题。
3个回答

6

使用async包及其函数:eachLimit

它与lodash相同,但具有异步流处理功能,并使迭代在一次不超过限制的情况下运行:

var async = require('async');
var exec = require('child_process').exec;
var folders = [...]; // a list from somewhere

var maxProcesses = 5; // 5 items at a time
async.eachLimit(
  folders, // collection
  maxProcesses, // limit
  function(folder, next) { // iterator function. args: item, callback
    var cmd = "tar -cf " + folder + ".tgz " + folder;
    console.log('calling:', cmd);
    exec(cmd, function(err, stdOut, stdErr) { // executing cmd
      if(err) console.error(err); // if error putting to console
      next(); // passing the async flow to handle the next iteration
    });
  },
  function() { // after all iterations finished
    console.log('finished processing commands');
  });

使用parallelLimit可以实现同样的效果:

var async = require('async');
var _ = require('lodash');
var exec = require('child_process').exec;
var folders = [...]; // a list from somewhere

var callStack = [];
_.each(folders, function(folder) { // generating our stack of commands
  callStack.push(function(done) {
    var cmd = "tar -cf " + folder + ".tgz " + folder;
    exec(cmd, function(err, stdOut, stdErr) {
      if(err) console.error(err);
      done(null, folder);
    });
  });
});

var maxProcesses = 5; // 5 items at a time
async.parallelLimit(callStack, maxProcesses, function() {console.log('finished');});

"使其看起来更短" :)
const
  async = require('async'),
  exec = require('child_process').exec;

let folders = [...]; 
async.eachLimit(folders, 5, 
  (folder, next) => 
    exec("tar -cf " + folder + ".tgz " + folder, () => next()),
    () => console.log('finished'));

并且。
const
  async = require('async'),
  exec = require('child_process').exec;

let folders = [...]; 
let commands = folders.map(folder => done => exec("tar -cf " + folder + ".tgz " + folder, () => done());
async.parallelLimit(commands, 5, () => console.log('finished'));

如果这些示例中的任何一个不适合您,或者您的系统非常庞大,那么让我们尝试使用消息队列系统,例如rsmq

1
我认为grunt或gulp也可能有适用于这种情况的插件。 - Paul
1
到目前为止,这是最好的答案,我真的很喜欢异步如何处理问题,同时保持事情异步。 - Sebastien
1
这并不是关于问题本身(每个有一点经验的程序员都能处理这个问题)。更多的是关于模式。我现在遇到了这个问题很多次,每次用20多行代码来解决它,感觉不太对。我想要一个更稳定和规范的模式,这就是为什么设置赏金的原因。 - Sebastien
@Sebastien 我已经更新了我的答案,使用了压缩版本 (: - num8er
1
哈哈,你其实不用这么做。你已经给出了目前最好的答案,而且我主要是在评估可读性、相关性和性能(异步方案都很好)。我只是想再等几天,以防有人提出更优雅的解决方案(虽然我觉得可能性不大)。 - Sebastien
显示剩余5条评论

2

承诺

我非常喜欢使用承诺并尽可能地将它们应用到各个方面。

这里有一个解决方案,我认为可以适用于您的情况。

var exec = require('child_process').exec;
var folders = ["1", "2", "3", "4", "5", "6", "7", "8", "9", "10"];
var maxConcurrentProcessCount = 5;
var promiseArr = [];

folders.forEach(function (folder) {
    var pr = {
        start: function () {
            if (pr.promise) return pr.promise;
            return pr.promise = new Promise(function (resolve) {
                exec("tar cvf " + folder + ".tgz " + folder,
                  undefined, (err, stdout, stderr) => {
                      // This is your logic, you can reject depending on err
                      var ind = promiseArr.indexOf(pr);
                      if (ind >= 0) promiseArr.splice(ind, 1);
                      resolve(stdout);
                  });
            });
        }
    };
    promiseArr.push(pr);
});

var racePromises = function () {
    if (!promiseArr.length) return;
    Promise.race(promiseArr.slice(0, maxConcurrentProcessCount).map(x => x.start())).then(racePromises);
    console.log("Current running process count: " + promiseArr.filter(x => x.promise).length);
}
racePromises();

简要说明

创建一个数组,其中每个元素代表一个任务。首先选择其中的5个并启动它们。每当其中一个完成时,将其从数组中删除,并再次从数组中启动5个任务。

示例运行

测试示例

只是为了好玩而使用 Promise 重新创建 eachLimit

var myEachLimit = function (collection, maxConcurrentCalls, callback) {
    return new Promise(function (resolve, reject) {
        var promiseArr = [];

        collection.forEach(function (item) {
            var pr = {
                start: function () {
                    if (pr.promise) return pr.promise;
                    return pr.promise = new Promise(function (resolve) {
                        callback.call(item, item, function () {
                            var ind = promiseArr.indexOf(pr);
                            if (ind >= 0) promiseArr.splice(ind, 1);
                            resolve();
                        });

                    });
                }
            };
            promiseArr.push(pr);
        });

        var racePromises = function () {
            if (!promiseArr.length) {
                resolve();
                return;
            }
            Promise.race(promiseArr.slice(0, maxConcurrentProcessCount).map(x => x.start())).then(racePromises);
            console.log("Current running process count: " + promiseArr.filter(x => x.promise).length);
        }
        racePromises();
    });
}


// Demo

var exec = require('child_process').exec;
var folders = ["1", "2", "3", "4", "5", "6", "7", "8", "9", "10"];
var maxConcurrentProcessCount = 5;

myEachLimit(folders, maxConcurrentProcessCount, function (folder, next) {
    exec("tar cvf " + folder + ".tgz " + folder, (err, stdout, stderr) => {
        next();
    });
}).then(function () {
    console.log("Finished all processes");
});

有趣的方式,但你不会超过控制数字吗?在racePromises的第二次运行中,您最多可以拥有9个进程...我认为我们应该在这里更改racePromises以接受一个可选的nbProcess参数。 - Sebastien
@Sebastien,它仍将运行5个进程。只有当进程完成时,才会从数组中删除该进程。因此,在第二次运行racePromises时,当您从数组中获取5个进程时,其中4个来自上一次运行。 - Gokhan Kurt
承诺很好,但@GökhanKurt说实话,您的代码有太多额外的操作,比如检查数组、映射、嵌套太深(http://joxi.ru/eAO55BF41Kbemo)-使JS代码看起来很丑。我知道`Promise`是最新的nodejs版本中附带的,它可能比额外的`async`包更快。但总体上,大约100毫秒的速度差异并没有太大的作用。 - num8er
1
@num8er 是的,承诺并不是专门为这项工作而设计的,很难理解等等。但这只是使用承诺的一种方式。实际上,您完全可以编写一个代码片段/函数,将其记录一次并用于所有情况。事实上,可以使用承诺编写类似于异步库中的eachLimit的函数。 - Gokhan Kurt
1
我很印象深刻。为你的努力点赞!虽然我仍然会依赖于库来简化任务,但那段代码帮助我更好地理解了Promise。绝对不是浪费。 - Sebastien

1

本地Javascript

你所需要的就是一种负载均衡器。将你的循环放入一个单独的函数中,如下所示:

  /**
  * Loops through your Folderarray and begins at the given index.
  * @param  {[type]} lastIndex       [last index which your loop stopped.]
  * @param  {[type]} maxProcesses    [maximum of processes you want to have.]
  * @param  {[type]} folderArray     [folder array.]
  */
  function loopFolders(maxProcesses, lastIndex, folderArray){

    // counter for our actual processes.
    let processes = 0;
    // This is to stop the loop, since JavaScript has no built-in break for loops.
    let maximumReached = false;

    // loop through array.
    folderArray.forEach(function(element, index, array){

      // Do stuff once you hit the last point.
      if(index > lastIndex && !maximumReached){

        // Check how many processes are running.
        if(processes <= maxProcesses){

          // create your child process.
          let exec = require('child_process').exec;
          // Do stuff with Folderelement from Folderarray.
          exec("tar cvf " + element + ".tgz " + element);

          // Increment processes.
          processes++;

        }else{
          /**
           * Magic begins here:
           * If max processes was reached, retry after a while.
           */

          // We are done for this loop.
           maximumReached = true;

           // Set timeout for 10 seconds and then retry.
          setTimeout(function(){
            // Calll function again.
            loopFolders(maxProcesses, index, array);
          }, 10000);
        }

      }

    });

  }

要从头开始调用此循环,只需按照以下步骤进行:
// your Array of folders from somewhere.    
let folders = [...];
// Call loopFolders with maximum of 5 and the beginning index of 0.
loopFolders(5, 0, folders);

这段代码是一个很基本的负载均衡器示例。请注意,我的示例永远不会知道其他进程是否完成。您可以使用某种回调来确保。但至少可以开始使用这个东西。
要使用NodeJS Childprocess事件,请参阅 https://nodejs.org/api/child_process.html 您可以在“退出”事件中为循环设置回调,以确保您的子进程不会失控。
希望有所帮助。 问候, Megajin

你如何控制执行的结束?如果tar工作时间更长,10秒后开始另一部分循环而没有完成之前的循环会怎么样?这将在后台创建一个巨大的归档命令列表,并且exec()也将失去控制。 - num8er
1
@num8er 这就是为什么我说这只是一个非常基本的例子。肯定有必要从exec子进程中回调一些东西。 回调可以在其中一个子进程事件(如'exit')中完成。之前的循环将运行空,因为没有适用于循环的原生JavaScript break。使用我定义的索引,您可以从给定点重新开始。 - Megajin
是的,关于基础知识,我同意您的看法。 - num8er

网页内容由stack overflow 提供, 点击上面的
可以查看英文原文,
原文链接