将大型数组传递给Node子进程

28

我有一个需要在大型数组上进行的复杂CPU密集型工作。理想情况下,我希望将其传递给子进程处理。

var spawn = require('child_process').spawn;

// dataAsNumbers is a large 2D array
var child = spawn(process.execPath, ['/child_process_scripts/getStatistics', dataAsNumbers]);

child.stdout.on('data', function(data){
  console.log('from child: ', data.toString());
});

但是当我这样做时,Node会报错:

spawn E2BIG

我看到了这篇文章,它似乎建议将数据通过管道传递给子进程。现在我的代码如下:

var spawn = require('child_process').spawn;

console.log('creating child........................');

var options = { stdio: [null, null, null, 'pipe'] };
var args = [ '/getStatistics' ];
var child = spawn(process.execPath, args, options);

var pipe = child.stdio[3];

pipe.write(Buffer('awesome'));

child.stdout.on('data', function(data){
  console.log('from child: ', data.toString());
});

然后在getStatistics.js中:

console.log('im inside child');

process.stdin.on('data', function(data) {
  console.log('data is ', data);
  process.exit(0);
});

但是在process.stdin.on里的回调没有被执行。我该如何在我的子进程获得一个流?

编辑

我不得不放弃缓冲区方法。现在我将数组作为消息发送:

var cp = require('child_process');
var child = cp.fork('/getStatistics.js');

child.send({ 
  dataAsNumbers: dataAsNumbers
});

但是只有在 dataAsNumbers 的长度低于约 20,000 时才有效,否则会超时。


该项目已经完成了90%,我现在不会更改使用Node。有很多文章解释了Node的高CPU使用率。 - Mark
2
通常,解决核心问题是开始一个项目的好方法。在多线程语言中,您不需要复制数据,因为线程共享内存。在这种情况下复制数据会减慢一切。除此之外,如果您将工作委托给libuv,Node就会很快。如果您计划使用Node的v8部分进行大量处理,则速度将不快。另外,如果由于任何原因这是实际服务器的一部分,则事件循环将被阻塞,I/O将饥饿,使得所有请求超时。 - arboreal84
为什么你不把它分成块发送,马克? - d9ngle
通常这个数组会有多少元素?另外,我假设它包含常规的JavaScript“数字”,我的理解正确吗? - rvighne
@rvighne 数组中最多可以有100万个条目,每个元素本身是一个数组,最多有20个条目。这些数组都是浮点数。 - Mark
显示剩余4条评论
7个回答

16

鉴于如此大量的数据,我建议考虑使用共享内存,而不是将数据复制到子进程中(这是在使用管道或传递消息时发生的情况)。这将节省内存,减少父进程的 CPU 时间,并且不太可能遇到某些限制。

shm-typed-array 是一个非常简单的模块,似乎适用于您的应用。例如:

parent.js

"use strict";

const shm = require('shm-typed-array');
const fork = require('child_process').fork;

// Create shared memory
const SIZE = 20000000;
const data = shm.create(SIZE, 'Float64Array');

// Fill with dummy data
Array.prototype.fill.call(data, 1);

// Spawn child, set up communication, and give shared memory
const child = fork("child.js");
child.on('message', sum => {
    console.log(`Got answer: ${sum}`);

    // Demo only; ideally you'd re-use the same child
    child.kill();
});
child.send(data.key);

child.js

"use strict";

const shm = require('shm-typed-array');

process.on('message', key => {
    // Get access to shared memory
    const data = shm.get(key, 'Float64Array');

    // Perform processing
    const sum = Array.prototype.reduce.call(data, (a, b) => a + b, 0);

    // Return processed data
    process.send(sum);
});
请注意,我们仅通过IPC向子进程发送一个小的“密钥”,而不是整个数据。因此,我们节省了大量内存和时间。
当然,您可以将“Float64Array”(例如一个“double”)更改为应用程序需要的任何类型化数组。请注意,这个特定的库只处理单维类型化数组;但这应该只是一个小障碍。

1

我也能够重现你所遇到的延迟问题,但可能没有你那么严重。我使用了以下方法:

// main.js
const fork = require('child_process').fork

const child = fork('./getStats.js')

const dataAsNumbers = Array(100000).fill(0).map(() =>
  Array(100).fill(0).map(() => Math.round(Math.random() * 100)))

child.send({
  dataAsNumbers: dataAsNumbers,
})

And

// getStats.js
process.on('message', function (data) {
  console.log('data is ', data)
  process.exit(0)
})

node main.js 2.72秒 用户 0.45秒 系统 103% CPU 3.045 共计

我正在生成由100个数字构成的100k个元素,以模拟您的数据,请确保在process上使用message事件。但也许您的子进程更复杂,这可能是失败的原因,还取决于您在查询中设置的超时时间。


如果您想获得更好的结果,您可以将数据分块成多个部分,发送到子进程并重新构建以形成初始数组。
另一种可能性是使用第三方库或协议,即使它需要更多的工作。您可以查看 messenger.js 或类似 AMQP 队列的东西,这可以让您使用池来在两个进程之间进行通信,并保证子进程确认消息。有一些 Node 实现,如amqp.node,但仍需要一些设置和配置工作。

谢谢,我有不同的问题,但是你的答案帮助我解决了我的问题。【在TeamCity中运行karma测试】(https://dev59.com/HKvka4cB1Zd3GeqPqEC7) - Adrian Moisa

0

0

你为什么想要创建一个子进程?在不同的子进程之间传输数据很可能会消耗更多的CPU和实时性资源,而这些成本可能比你在同一进程中进行处理所节省的成本还要高。

相反,我建议你考虑在运行在与nodejs主进程相同内存中的工作线程中进行统计计算,以实现超级高效的编码。

你可以使用NAN编写C++代码,并将其发布到工作线程中,然后当完成时,让该工作线程将结果和事件返回到你的nodejs事件循环中。

这样做的好处是你不需要额外的时间来将数据发送到不同的进程,但缺点是你需要编写一些用于线程操作的C++代码,但NAN扩展应该能够为你处理大部分困难的任务。


我猜他想要使用多个核心。 - GroovyDotCom
1
如果将其推送到工作线程,则将使用多个核心。 - Soren

0
为了解决将大量数据传递给子进程时出现的性能问题,可以将数据保存到 .json 或 .txt 文件中,并仅传递文件名给子进程。我采用这种方法已经实现了70%的性能改进。

这并没有回答问题。一旦您拥有足够的声望,您将能够评论任何帖子;相反,提供不需要询问者澄清的答案。- 来自审核 - kavigun

0

你可以考虑使用操作系统管道(在这里找到一个要点)作为你的节点子应用程序的输入。

我知道这不完全是你所要求的,但你可以使用集群模块(包含在节点中)。这样,你就可以获得与你机器核心数量相同的实例来加速处理。此外,如果你在开始处理之前不需要所有数据,请考虑使用流。如果要处理的数据太大,我会将其存储在文件中,以便在过程中出现任何错误时可以重新初始化。以下是一个集群示例。

var cluster = require('cluster');
var numCPUs = 4;

if (cluster.isMaster) {
    for (var i = 0; i < numCPUs; i++) {
        var worker = cluster.fork();
        console.log('id', worker.id)
    }
} else {
    doSomeWork()
}

function doSomeWork(){
    for (var i=1; i<10; i++){
        console.log(i)
    }
}

有关在工作进程之间发送消息的更多信息问题8534462


-1

对于长时间的任务,您可以使用类似 gearman 的工具。您可以将繁重的工作流程放在工作人员身上,这样您就可以设置需要多少个工作人员。例如,我以这种方式处理一些文件,如果需要扩展,您可以创建更多的工作实例,同时我还有不同的工作人员用于不同的任务,如处理 zip 文件、生成缩略图等。好处是这些工作人员可以使用任何语言编写,如 node.js、Java、Python,并且可以轻松地集成到您的项目中。

// worker-unzip.js
const debug = require('debug')('worker:unzip');
const {series, apply} = require('async');
const gearman = require('gearmanode');
const {mkdirpSync} = require('fs-extra');
const extract = require('extract-zip');

module.exports.unzip = unzip;
module.exports.worker = worker;

function unzip(inputPath, outputDirPath, done) {
  debug('unzipping', inputPath, 'to', outputDirPath);
  mkdirpSync(outputDirPath);
  extract(inputPath, {dir: outputDirPath}, done);
}


/**
 *
 * @param {Job} job
 */
function workerUnzip(job) {
  const {inputPath, outputDirPath} = JSON.parse(job.payload);
  series([
    apply(unzip, inputPath, outputDirPath),
    (done) => job.workComplete(outputDirPath)
  ], (err) => {
    if (err) {
      console.error(err);
      job.reportError();
    }
  });
}

function worker(config) {
  const worker = gearman.worker(config);
  if (config.id) {
    worker.setWorkerId(config.id);
  }

  worker.addFunction('unzip', workerUnzip, {timeout: 10, toStringEncoding: 'ascii'});
  worker.on('error', (err) => console.error(err));

  return worker;
}

一个简单的index.js
const unzip = require('./worker-unzip').worker;

unzip(config); // pass host and port of the Gearman server

我通常使用PM2来运行工作进程

与您的代码集成非常容易。类似于以下内容:

//initialize
const gearman = require('gearmanode');

gearman.Client.logger.transports.console.level = 'error';
const client = gearman.client(configGearman); // same host and port

只需将函数名称传递给队列,即可将工作添加到队列中。

const taskpayload = {inputPath: '/tmp/sample-file.zip', outputDirPath: '/tmp/unzip/sample-file/'}
const job client.submitJob('unzip', JSON.stringify(taskpayload));
job.on('complete', jobCompleteCallback);
job.on('error', jobErrorCallback);

网页内容由stack overflow 提供, 点击上面的
可以查看英文原文,
原文链接