在Node.js中协调并行执行

79
node.js 的事件驱动编程模型使得协调程序流程有些棘手。
简单的顺序执行会被转换成嵌套回调,这很容易(虽然写起来有点费劲)。
但是并行执行怎么样呢?假设你有三个任务 A、B、C 可以并行运行,当它们完成时,你想将它们的结果发送给任务 D。
使用 fork/join 模型可以实现:
  • fork A
  • fork B
  • fork C
  • join A,B,C, run D
如何在 node.js 中实现呢?是否有最佳实践或食谱?我是否每次都需要手工解决,还是有一些帮手的库? 请注意:本翻译保留了原文中 "{{" 和 "" 符号以及 HTML 标签,请勿删除。}}
7个回答

129

由于Node.js是单线程的,因此在它内部不存在真正的并行。但是,可以安排多个事件以一个无法预先确定的顺序运行。有些事情,比如数据库访问,实际上是“并行”的,因为数据库查询本身在单独的线程中运行,但在完成后会重新集成到事件流中。

那么,如何在多个事件处理程序上安排回调函数呢?嗯,在浏览器端JavaScript动画中,这是一种常见的技术:使用变量来跟踪完成情况。

这听起来像是一种hack,而且它确实是。在其他不那么好的语言中,可能会留下一堆全局变量进行跟踪,并且可能会变得非常混乱。但是,在JavaScript中,我们可以使用闭包:

function fork (async_calls, shared_callback) {
  var counter = async_calls.length;
  var callback = function () {
    counter --;
    if (counter == 0) {
      shared_callback()
    }
  }

  for (var i=0;i<async_calls.length;i++) {
    async_calls[i](callback);
  }
}

// usage:
fork([A,B,C],D);
在上面的示例中,我们假设异步和回调函数不需要任何参数来保持代码简单。当然,您可以修改代码以向异步函数传递参数,并使回调函数累积结果并将其传递给共享回调函数。

额外的答案:

实际上,即使现在这样,fork()函数已经可以使用闭包向异步函数传递参数:

fork([
  function(callback){ A(1,2,callback) },
  function(callback){ B(1,callback) },
  function(callback){ C(1,2,callback) }
],D);

现在唯一要做的就是累加来自A、B、C的结果并将它们传递给D。


更多额外的答案:

我忍不住了。早餐期间一直在想这个问题。这里是一个实现 fork() 并累加结果(通常作为回调函数的参数传递)的示例:

function fork (async_calls, shared_callback) {
  var counter = async_calls.length;
  var all_results = [];
  function makeCallback (index) {
    return function () {
      counter --;
      var results = [];
      // we use the arguments object here because some callbacks 
      // in Node pass in multiple arguments as result.
      for (var i=0;i<arguments.length;i++) {
        results.push(arguments[i]);
      }
      all_results[index] = results;
      if (counter == 0) {
        shared_callback(all_results);
      }
    }
  }

  for (var i=0;i<async_calls.length;i++) {
    async_calls[i](makeCallback(i));
  }
}

这相当容易。这使得fork()相当通用,可用于同步多个非同构事件。

在Node.js中的示例用法:

// Read 3 files in parallel and process them together:

function A (c){ fs.readFile('file1',c) };
function B (c){ fs.readFile('file2',c) };
function C (c){ fs.readFile('file3',c) };
function D (result) {
  file1data = result[0][1];
  file2data = result[1][1];
  file3data = result[2][1];

  // process the files together here
}

fork([A,B,C],D);

更新

这段代码是在像async.js或诸如基于Promise的库之类的库出现之前编写的。我想相信async.js受到了这个代码的启发,但我没有任何证据可以证明这一点。无论如何...如果你今天想要做这个,请看看async.js或promises。只需将上面的答案视为说明/演示异步操作如何工作的好方法。

为了完整起见,以下是使用async.parallel的方法:

var async = require('async');

async.parallel([A,B,C],D);
请注意,async.parallel 的工作方式与我们上面实现的 fork 函数完全相同。主要区别在于它将错误作为第一个参数传递给 D,将回调函数作为第二个参数按照 Node.js 约定传递。
使用 promises,我们可以编写如下代码:
// Assuming A, B & C return a promise instead of accepting a callback

Promise.all([A,B,C]).then(D);

12
“在Node.js中没有真正的并行,因为它是单线程的。”这并不正确。所有不使用CPU(例如等待网络I/O)的操作都可以并行运行。 - Thilo
3
大部分情况下这是正确的。在Node中等待IO不会阻塞其他代码运行,但当代码运行时,它是一次只执行一个。在Node中唯一真正的并行执行来自生成子进程,但这也可以说几乎适用于任何环境。 - MooGoo
6
通常情况下,我们将不使用CPU的代码称为未运行。如果你没有在运行,就不能够“并行运行”。 - slebetman
4
这句话的意思是,在处理事件时,因为我们知道它们绝对不能并行运行,所以我们不必担心信号量和互斥锁,但在处理线程时,我们必须锁定共享资源。 - slebetman
2
我说这些不是并行执行的函数,而是(最多)在未确定的顺序中执行,直到每个“async_func”返回代码才会继续进行,我的理解正确吗? - Aaron Rustad
显示剩余17条评论

10

我认为现在的“async”模块提供了这个并行功能,并且大致上与上面的fork函数相同。


2
这是不正确的,异步仅帮助您在单个进程中组织代码流程。 - bwindels
2
async.parallel确实做了与上面的fork函数大致相同的事情。 - Dave Stibrany
这不是真正的并行处理。 - rab

5
The futures模块有一个名为join的子模块,我喜欢使用它:

类似于对线程使用pthread_join,将异步调用连接起来。

自由风格或使用future子模块使用Promise模式可以在readme中显示一些很好的示例。文档中的示例:
var Join = require('join')
  , join = Join()
  , callbackA = join.add()
  , callbackB = join.add()
  , callbackC = join.add();

function abcComplete(aArgs, bArgs, cArgs) {
  console.log(aArgs[1] + bArgs[1] + cArgs[1]);
}

setTimeout(function () {
  callbackA(null, 'Hello');
}, 300);

setTimeout(function () {
  callbackB(null, 'World');
}, 500);

setTimeout(function () {
  callbackC(null, '!');
}, 400);

// this must be called after all 
join.when(abcComplete);

2

它似乎并不像step一样进行真正的并行处理。 - Evan Leis

2
这里可能有一个简单的解决方案:http://howtonode.org/control-flow-part-ii,请滚动到“Parallel actions”部分。另一种方法是让A、B和C共用同一个回调函数,让该函数具有全局或至少是函数外的增量器,如果这三个函数都调用了回调,则让它运行D,当然你还需要在某处存储A、B和C的结果。请注意保留HTML标签。

0
除了流行的 Promises 和 Async 库之外,还有第三种优雅的方式 - 使用“wiring”:
var l = new Wire();

funcA(l.branch('post'));
funcB(l.branch('comments'));
funcC(l.branch('links'));

l.success(function(results) {
   // result will be object with results:
   // { post: ..., comments: ..., links: ...}
});

https://github.com/garmoshka-mo/mo-wire


0

网页内容由stack overflow 提供, 点击上面的
可以查看英文原文,
原文链接