我们有几个 Node.js 进程需要能够相互传递消息,最高效的方式是什么?使用 node_redis 的发布/订阅功能如何?
编辑:这些进程可能在不同的机器上运行。
我们有几个 Node.js 进程需要能够相互传递消息,最高效的方式是什么?使用 node_redis 的发布/订阅功能如何?
编辑:这些进程可能在不同的机器上运行。
如果你想要在不需要回调的情况下从一台机器发送消息到另一台机器,那么Redis的发布/订阅是最佳解决方案。它非常易于实现,而且Redis速度非常快。
首先,您必须在其中一台计算机上安装Redis。
连接到Redis非常简单:
var client = require('redis').createClient(redis_port, redis_host);
但不要忘记在防火墙中打开Redis端口!
然后,您必须将每台机器订阅到某个频道:
client.on('ready', function() {
return client.subscribe('your_namespace:machine_name');
});
client.on('message', function(channel, json_message) {
var message;
message = JSON.parse(json_message);
// do whatever you vant with the message
});
你可以跳过your_namespace
并使用全局命名空间,但早晚会后悔。
发送消息也非常容易:
var send_message = function(machine_name, message) {
return client.publish("your_namespace:" + machine_name, JSON.stringify(message));
};
如果您想发送不同类型的消息,则可以使用“pmessages”而不是“messages”:client.on('ready', function() {
return client.psubscribe('your_namespace:machine_name:*');
});
client.on('pmessage', function(pattern, channel, json_message) {
// pattern === 'your_namespace:machine_name:*'
// channel === 'your_namespace:machine_name:'+message_type
var message = JSON.parse(message);
var message_type = channel.split(':')[2];
// do whatever you want with the message and message_type
});
send_message = function(machine_name, message_type, message) {
return client.publish([
'your_namespace',
machine_name,
message_type
].join(':'), JSON.stringify(message));
};
最佳实践是按照它们的功能(例如'send_email'
)为您的进程(或机器)命名。在这种情况下,如果进程(或机器)实现了多个功能,则可以订阅更多通道。
实际上,使用Redis可以构建双向通信。但这更棘手,因为它需要为每个消息添加唯一的回调通道名称,以便在不丢失上下文的情况下接收回调。
所以,我的结论是:如果您需要“发送并忘记”通信,请使用Redis;如果您需要全功能的双向通信,请尝试其他解决方案。
json.parse
和json.stringify
的性能问题。我正在使用nodejs作为我的游戏服务器,并且正在使用3、4甚至更多的节点实例来与Redis通信(以便进行水平扩展)——这是我正在开发的aRPG游戏,例如攻击怪物、移动等等都会非常繁忙。这样做还可以吗?或者我现在是边缘过早优化的想法?谢谢。 - NiCk Newman为什么不使用ZeroMQ/0mq进行进程间通信?对于像进程间通信这样简单的任务来说,使用Redis(一种数据库)就有点大材小用了。
引用指南中的话:
使用0MQ(或通过Node核心中的net库使用香草套接字,减去0MQ套接字提供的所有功能)的优点是没有主进程。它的无代理设置最适合您所描述的情况。如果您只是从一个中央进程向各个节点推送消息,则可以在0mq中使用PUB / SUB套接字(还支持通过PGM / EPGM进行IP多播)。除此之外,0mq还提供了各种不同类型的套接字(PUSH / PULL / XREP / XREQ / ROUTER / DEALER),您可以使用这些套接字类型创建自定义设备。ØMQ(ZeroMQ、0MQ、zmq)看起来像是一个可嵌入的网络库,但实际上它表现得像一个并发框架。它提供了套接字,可以在各种传输方式之间传递原子消息,例如进程内、进程间、TCP 和多播。您可以使用类似于扇出、发布-订阅、任务分配和请求-响应的模式将套接字连接到 N 对 N。它足够快,可以成为群集产品的基础。它的异步 I/O 模型为您提供可扩展的多核应用程序,构建为异步消息处理任务。
从这个优秀的指南开始: http://zguide.zeromq.org/page:all
对于0MQ 2.x:
http://github.com/JustinTulloss/zeromq.node
对于0MQ 3.x(以上模块的一个分支。这个版本支持PUBLISHER端用于PUBSUB过滤):
距离问题被提出已经超过4年时间了,现在有一个叫做 node-ipc 的进程间通信模块。它支持Unix/Windows sockets用于在同一台机器上进行通信,同时还支持TCP、TLS和UDP,并声称至少sockets、TCP和UDP是稳定的。
这里是从GitHub存储库的文档中摘录的一个小示例:
Unix Socket、Windows Socket和TCP Socket服务器
var ipc=require('node-ipc');
ipc.config.id = 'world';
ipc.config.retry= 1500;
ipc.serve(
function(){
ipc.server.on(
'message',
function(data,socket){
ipc.log('got a message : '.debug, data);
ipc.server.emit(
socket,
'message',
data+' world!'
);
}
);
}
);
ipc.server.start();
Unix套接字和TCP套接字的客户端
var ipc=require('node-ipc');
ipc.config.id = 'hello';
ipc.config.retry= 1500;
ipc.connectTo(
'world',
function(){
ipc.of.world.on(
'connect',
function(){
ipc.log('## connected to world ##'.rainbow, ipc.config.delay);
ipc.of.world.emit(
'message',
'hello'
)
}
);
ipc.of.world.on(
'disconnect',
function(){
ipc.log('disconnected from world'.notice);
}
);
ipc.of.world.on(
'message',
function(data){
ipc.log('got a message from world : '.debug, data);
}
);
}
);
我目前正在评估这个模块,作为替代旧的 stdin/stdout 解决方案的本地 ipc(但将来也可能是远程 ipc)。如果我完成后有时间,可能会扩展我的答案,以提供有关此模块如何工作的更多信息。
'## connected to world ##'.rainbow
)并且其接口在 2019 年已经有点过时了。 - Lucio Paiva如下:
process.on('SIGINT', function () {
console.log('Got SIGINT. Press Control-D to exit.');
});
这是一个信号,表示进程接收到了一个信号。请参见sigaction(2)以获取标准POSIX信号名称列表,例如SIGINT、SIGUSR1等。
当您了解进程时,可以生成一个child-process并将其连接到事件以检索和发送消息。使用
child_process.fork()
时,可以使用child.send(message, [sendHandle])
向子进程写入内容,并通过子进程上的'message'事件接收消息。
此外,您还可以使用cluster。集群模块允许您轻松创建一组共享服务器端口的进程网络。
var cluster = require('cluster');
var http = require('http');
var numCPUs = require('os').cpus().length;
if (cluster.isMaster) {
// Fork workers.
for (var i = 0; i < numCPUs; i++) {
cluster.fork();
}
cluster.on('exit', function(worker, code, signal) {
console.log('worker ' + worker.process.pid + ' died');
});
} else {
// Workers can share any TCP connection
// In this case its a HTTP server
http.createServer(function(req, res) {
res.writeHead(200);
res.end("hello world\n");
}).listen(8000);
}
并不是每个人都知道 pm2 有一个API,通过它你可以与其进程进行通信。
// pm2-call.js:
import pm2 from "pm2";
pm2.connect(() => {
pm2.sendDataToProcessId(
{
type: "process:msg",
data: {
some: "data",
hello: true,
},
id: 0,
topic: "some topic",
},
(err, res) => {}
);
});
pm2.launchBus((err, bus) => {
bus.on("process:msg", (packet) => {
packet.data.success.should.eql(true);
packet.process.pm_id.should.eql(proc1.pm2_env.pm_id);
done();
});
});
// pm2-app.js:
process.on("message", (packet) => {
process.send({
type: "process:msg",
data: {
success: true,
},
});
});
几年前,我需要在另一种语言(Perl)的Web服务器进程之间进行IPC。在调查了共享内存、Unix信号(例如SIGINT和信号处理程序)和其他选项后,我最终选择了一些非常简单但效果很好且速度很快的东西。但是,如果您的进程没有访问相同的文件系统,则可能不适用。
这个概念是使用文件系统作为通信渠道。在我的世界中,我有一个EVENTS目录,在其下面有子目录以将消息定向到适当的进程:例如/EVENTS/1234/player1和/EVENTS/1234/player2,其中1234是具有两个不同玩家的特定游戏。如果进程想要了解特定玩家游戏中发生的所有事件,它可以使用以下内容(在Node.js中)监听/EVENTS/1234/player1:
fs.watch(或fsPromises.watch)
如果一个进程想要监听某个特定游戏的所有事件,只需使用 fs.watch 的 'recursive: true' 选项设置来观察 /EVENTS/1234。或者观察 /EVENTS 来查看所有消息 -- fs.watch 产生的事件将告诉您哪个文件路径被修改了。
举个更具体的例子,在我的世界中,我有 player1 的 Web 浏览器客户端监听 Server-Sent Events (SSE),并且有一个循环在一个特定的 Web 服务器进程中运行以发送这些事件。现在,服务于 player2 的 Web 服务器进程想要向运行 player1 SSE 的服务器进程发送一条消息(IPC),但不知道可能是哪个进程;它只是在 /EVENTS/1234/player1 中写入(或修改)一个文件。该目录正在通过处理 player1 SSE 的 Web 服务器进程中的 fs.watch 进行监视。我发现这个系统非常灵活、快速,并且还可以设计成留下所有发送的消息的记录。我使用它使得许多随机的 Web 服务器进程之一可以与另一个特定的 Web 服务器进程通信,但它也可以用于 N 对 1 或 1 对 N 的方式。
希望这能帮助到某些人。你基本上是让操作系统和文件系统为你完成工作。以下是关于如何在MacOS和Linux中实现此功能的一些链接:https://man7.org/linux/man-pages/man7/inotify.7.html
无论你使用什么语言,任何模块都会连接到像这样的API。我已经有30多年没有在Windows上进行过太多的调整,所以我不知道文件系统事件在那里是如何工作的,但我敢打赌有一个等效的。
编辑(来自https://nodejs.org/dist/latest-v19.x/docs/api/fs.html#fswatchfilename-options-listener的不同平台更多信息):
注意事项# fs.watch API在各个平台上并不完全一致,并且在某些情况下不可用。
在Windows上,如果被监视的目录被移动或重命名,则不会发出任何事件。当删除被监视的目录时,会报告EPERM错误。
可用性# 此功能取决于底层操作系统提供通知文件系统更改的方法。
在Linux系统上,这使用inotify(7)。 在BSD系统上,这使用kqueue(2)。 在macOS上,这对文件使用kqueue(2),对目录使用FSEvents。 在SunOS系统(包括Solaris和SmartOS)上,这使用事件端口。 在Windows系统上,此功能取决于ReadDirectoryChangesW。 在AIX系统上,此功能取决于启用了AHAFS。 在IBM i系统上,不支持此功能。 如果由于某种原因基础功能不可用,则fs.watch()将无法正常工作并可能引发异常。例如,在网络文件系统(NFS、SMB等)或主机文件系统上使用虚拟化软件(如Vagrant或Docker)时,监视文件或目录可能会不可靠甚至是不可能的。