NodeJS|Cluster:如何从主进程发送数据到所有或单个子进程/工作进程?

35

我有一个来自node的运作良好的(库存)脚本

var cluster = require('cluster');
var http = require('http');
var numReqs = 0;

if (cluster.isMaster) {
  // Fork workers.
  for (var i = 0; i < 2; i++) {
    var worker = cluster.fork();

    worker.on('message', function(msg) {
      if (msg.cmd && msg.cmd == 'notifyRequest') {
        numReqs++;
      }
    });
  }

  setInterval(function() {
    console.log("numReqs =", numReqs);
  }, 1000);
} else {
  // Worker processes have a http server.
  http.Server(function(req, res) {
    res.writeHead(200);
    res.end("hello world\n");
    // Send message to master process
    process.send({ cmd: 'notifyRequest' });
  }).listen(8000);
}
在上面的脚本中,我可以轻松地从工作进程向主进程发送数据。但如何从主进程发送数据到工作进程/工作进程?如果有示例,请说明。
7个回答

51

由于cluster.fork是基于child_process.fork实现的,您可以通过使用worker.send({ msg: 'test' })从主进程向工作进程发送消息,从工作进程向主进程发送消息则使用process.send({ msg: 'test' });。您可以像这样接收消息:worker.on('message', callback)(从工作进程到主进程),process.on('message', callback);(从主进程到工作进程)。

下面是一个完整的示例,您可以通过浏览http://localhost:8000/进行测试,然后工作进程将向主进程发送一条消息,主进程将做出回复:

var cluster = require('cluster');
var http = require('http');
var numReqs = 0;
var worker;

if (cluster.isMaster) {
  // Fork workers.
  for (var i = 0; i < 2; i++) {
    worker = cluster.fork();

    worker.on('message', function(msg) {
      // we only want to intercept messages that have a chat property
      if (msg.chat) {
        console.log('Worker to master: ', msg.chat);
        worker.send({ chat: 'Ok worker, Master got the message! Over and out!' });
      }
    });

  }
} else {
  process.on('message', function(msg) {
    // we only want to intercept messages that have a chat property
    if (msg.chat) {
      console.log('Master to worker: ', msg.chat);
    }
  });
  // Worker processes have a http server.
  http.Server(function(req, res) {
    res.writeHead(200);
    res.end("hello world\n");
    // Send message to master process
    process.send({ chat: 'Hey master, I got a new request!' });
  }).listen(8000);
}

2
听起来还不错,确保你使用Socket.IO和RedisStore。 - alessioalex
2
这行不通。for循环内的varworker将会保存最后一个被fork的worker,而不是每个worker(特别是在事件回调函数内)。如果你不关心全部,你可以仅仅封闭你的回调函数;或者你可以将所有的workers存放在数组中。 - dresende
抱歉关于变量的问题,我复制了他代码的一部分。我没有将所有的工作人员都放入一个数组中,因为我只是想证明其功能性。 - alessioalex
@alessioalex 通过工作进程发送数据比使用Redis更慢还是更快? - NiCk Newman
@NiCkNewman 说实话,我不太确定。使用集群时,您将使用 child_process,我认为它会打开一个套接字来处理进程间通信。您应该使用真实数据进行几次测试,并查看它们的表现如何。 - alessioalex
显示剩余2条评论

8

我在寻找一种向所有子进程发送消息的方法时,发现了这个帖子,并且由于数组的评论,我很幸运地解决了问题。只是想说明利用这种方法发送消息给所有子进程的一个潜在解决方案。

var cluster = require('cluster');
var http = require('http');
var numReqs = 0;
var workers = [];

if (cluster.isMaster) {
  // Broadcast a message to all workers
  var broadcast = function() {
    for (var i in workers) {
      var worker = workers[i];
      worker.send({ cmd: 'broadcast', numReqs: numReqs });
    }
  }

  // Fork workers.
  for (var i = 0; i < 2; i++) {
    var worker = cluster.fork();

    worker.on('message', function(msg) {
      if (msg.cmd) {
        switch (msg.cmd) {
          case 'notifyRequest':
            numReqs++;
          break;
          case 'broadcast':
            broadcast();
          break;
        }
    });

    // Add the worker to an array of known workers
    workers.push(worker);
  }

  setInterval(function() {
    console.log("numReqs =", numReqs);
  }, 1000);
} else {
  // React to messages received from master
  process.on('message', function(msg) {
    switch(msg.cmd) {
      case 'broadcast':
        if (msg.numReqs) console.log('Number of requests: ' + msg.numReqs);
      break;
    }
  });

  // Worker processes have a http server.
  http.Server(function(req, res) {
    res.writeHead(200);
    res.end("hello world\n");
    // Send message to master process
    process.send({ cmd: 'notifyRequest' });
    process.send({ cmd: 'broadcast' });
  }).listen(8000);
}

8

以下是我如何实现类似问题的解决方案。通过钩入cluster.on('fork'),您可以将消息处理程序附加到在分叉时创建的工作进程上(而不是将它们存储在数组中),这样做的额外优点是可以处理工作进程死亡或断开连接并重新分叉的情况。

以下代码片段将向所有工作进程发送来自主进程的消息。

if (cluster.isMaster) {
    for (var i = 0; i < require('os').cpus.length; i++) {
        cluster.fork();
    }

    cluster.on('disconnect', function(worker) {
        cluster.fork();
    }

    // When a new worker process is forked, attach the handler
    // This handles cases where new worker processes are forked
    // on disconnect/exit, as above.
    cluster.on('fork', function(worker) {
        worker.on('message', messageRelay);
    }

    var messageRelay = function(msg) {
        Object.keys(cluster.workers).forEach(function(id) {
            cluster.workers[id].send(msg);
        });
    };
}
else {
    process.on('message', messageHandler);

    var messageHandler = function messageHandler(msg) {
        // Worker received message--do something
    };
}

3
我了解您想要向集群中的所有节点工作进程广播信息,虽然您不能直接发送socket组件,但是有一些解决方案可以达到目的。我会通过以下示例尝试解释: 步骤1:当客户端操作需要广播时:
Child.js (Process that has been forked) :

socket.on("BROADCAST_TO_ALL_WORKERS", function (data) 
{
    process.send({cmd : 'BROADCAST_TO_ALL_WORKERS', message :data.message});
}) 

步骤 2:在集群创建侧

Server.js (Place where cluster forking happens):

if (cluster.isMaster) {

  for (var i = 0; i < numCPUs; i++) {

    var worker = cluster.fork();

    worker.on('message', function (data) {
     if (data.cmd === "BROADCAST_TO_ALL_WORKERS") {
       console.log(server_debug_prefix() + "Server Broadcast To All, Message : " + data.message + " , Reload : " + data.reload + " Player Id : " + data.player_id);
        Object.keys(cluster.workers).forEach(function(id) {
            cluster.workers[id].send({cmd : "BROADCAST_TO_WORKER", message : data.message});
        });
      }
    });
  }

  cluster.on('exit', function (worker, code, signal) {
    var newWorker = cluster.fork();
    newWorker.on('message', function (data) {
      console.log(data);
      if (data.cmd === "BROADCAST_TO_ALL_WORKERS") {
        console.log(data.cmd,data);
        Object.keys(cluster.workers).forEach(function(id) {
            cluster.workers[id].send({cmd : "BROADCAST_TO_WORKER", message : data.message});
        });
      }
    });
  });
} 
else {
  //Node Js App Entry
  require("./Child.js");
}

第三步:在子进程中进行广播 - -> 在Child.js中的io.on("connection")之前加入此代码。
process.on("message", function(data){
    if(data.cmd === "BROADCAST_TO_WORKER"){
        io.sockets.emit("SERVER_MESSAGE", { message: data.message, reload: data.reload, player_id : data.player_id });
    }
});

我希望这可以帮到您。如果需要更多解释,请告诉我。

1

您应该能够像这样从主节点向工作节点发送消息:

worker.send({message:'hello'})

因为"cluster.fork是在child_process.fork的基础上实现的" (cluster.fork is implemented on top of child_process.fork)

是的,它有效了,谢谢!换句话说:在分叉工作进程时,我应该将它们存储在数组中。并且按顺序迭代此数组以向每个子进程发送数据。是否有其他方法可以发送数据到所有工作进程而无需存储和迭代? - htonus
如果您不想将工作人员存储到数组中并迭代它们以发送消息,则可以使用Unix域套接字从主进程向工作进程发送消息。 - alessioalex
我想你可以在主进程中创建一个EventEmitter,每当接收到消息时就触发一个事件。在创建每个工作进程后,您只需要向EventEmitter添加一个侦听器,该侦听器将把消息发送给工作进程。当然,这仍然是通过将侦听器的引用(因此也是工作进程的引用)存储到EventEmitter中来实现的,但是至少您不必再去查看它了。 - cheng81

0
如果您只需要发送子进程的简单配置数据,则可以使用cluster.fork()发送环境变量。这非常有用,相对于通过集群和进程send方法发送消息具有优势。
const cluster = require('cluster')

if (cluster.isMaster) {
  cluster.fork({
    MY_DATA: 'something here'
  })
} else {
  console.log(process.env.MY_DATA) // "something here"
}

0

为了向所有活跃的工作进程发送消息,需要从Master触发。

注意:这是基于一个工作进程触发消息广播,但它可以被任何东西触发。

const cluster = require('cluster')
const numCPUs = require("os").cpus().length;

// Master
if ( cluster.isMaster ) {

  for (let i = 0; i < numCPUs; i++) {

    let forkedWorker = cluster.fork();

    forkedWorker.on('message', workerMessage => {

      if (workerMessage === 'pleaseUpdateAllWorkers') {

        let workerPool = cluster.workers; // <-- this returns an Object with all worker nodes with their clister.worker.id as their Object Key

        let workerKeys = Object.keys(workerPool); // e.g. [ '1', '2', '3', '4' ] for 4 workers

        workerKeys.forEach( workerKey => {
          let workerProcess = workerPool[workerKey]; // we're accessing this iteration's worker in the workerPool

          // ensure the worker is still online
          if (workerProcess.state === "online") {
            // broadcast
            workerProcess.send('newUpdate!')
          }

        })

        /* for the one-liners
          Object.keys(cluster.workers).forEach( w => (cluster.workers[w].state === "online") ? cluster.workers[w].send('newUpdate!') : null )
        */

      }

    })

  }

}

// Worker
else {

  async function doesSomething(){
  
    await doingSomething();
    
    process.send('pleaseUpdateAllWorkers')
  
  }
  
  process.on('message', masterMessage => {
  
    if (masterMessage === "newUpdate!") {
    
      console.log(`worker id ${cluster.worker.id} received an update from master.`, masterMessage)
    
      // politely respond to master
      process.send('thanks!') 
    }
  
  })

}

如果您想针对一个特定的工作者发送消息,可以使用其ID来发送消息。

workerPool[workerIDYouWantToUpdate].send('newUpdate!')

或者,如果您只想回复发送消息的工作人员:

forkedWorker.send('newUpdate!')

网页内容由stack overflow 提供, 点击上面的
可以查看英文原文,
原文链接