使用集群将Socket.IO扩展到多个Node.js进程

72

这个问题让我十分苦恼... 有没有人成功地将 Socket.IO 扩展到由 Node.js 的 cluster 模块生成的多个“worker”进程中?

假设在四个 worker 进程中我有以下伪代码:

// on the server
var express = require('express');
var server = express();
var socket = require('socket.io');
var io = socket.listen(server);

// socket.io
io.set('store', new socket.RedisStore);

// set-up connections...
io.sockets.on('connection', function(socket) {

  socket.on('join', function(rooms) {
    rooms.forEach(function(room) {
      socket.join(room);
    });
  });

  socket.on('leave', function(rooms) {
    rooms.forEach(function(room) {
      socket.leave(room);
    });
  });

});

// Emit a message every second
function send() {
  io.sockets.in('room').emit('data', 'howdy');
}

setInterval(send, 1000);

并且在浏览器中...

// on the client
socket = io.connect();
socket.emit('join', ['room']);

socket.on('data', function(data){
  console.log(data);
});

The problem: 每秒钟我会收到四条消息,因为有四个独立的工作进程在发送消息。

我该如何确保消息只被发送一次?


你使用的是哪个版本的socket.io?Socket.IO 0.6被设计为单进程服务器。请参考3rdEden在这个stackoverflow帖子中的回答。https://dev59.com/wW025IYBdhLWcg3wi2mw - HariKrishnan
3
0.9.16 使用 RedisStore - Lee Benson
你可以使用SocketCluster(接口与Socket.io兼容):https://github.com/topcloud/socketcluster - Jon
4个回答

114
编辑:在 Socket.IO 1.0+ 中,不再需要使用多个 Redis 客户端设置存储,现在可以使用更简单的 Redis 适配器模块。
var io = require('socket.io')(3000);
var redis = require('socket.io-redis');
io.adapter(redis({ host: 'localhost', port: 6379 }));

下面展示的示例将更像这样:
var cluster = require('cluster');
var os = require('os');

if (cluster.isMaster) {
  // we create a HTTP server, but we do not use listen
  // that way, we have a socket.io server that doesn't accept connections
  var server = require('http').createServer();
  var io = require('socket.io').listen(server);
  var redis = require('socket.io-redis');

  io.adapter(redis({ host: 'localhost', port: 6379 }));

  setInterval(function() {
    // all workers will receive this in Redis, and emit
    io.emit('data', 'payload');
  }, 1000);

  for (var i = 0; i < os.cpus().length; i++) {
    cluster.fork();
  }

  cluster.on('exit', function(worker, code, signal) {
    console.log('worker ' + worker.process.pid + ' died');
  }); 
}

if (cluster.isWorker) {
  var express = require('express');
  var app = express();

  var http = require('http');
  var server = http.createServer(app);
  var io = require('socket.io').listen(server);
  var redis = require('socket.io-redis');

  io.adapter(redis({ host: 'localhost', port: 6379 }));
  io.on('connection', function(socket) {
    socket.emit('data', 'connected to worker: ' + cluster.worker.id);
  });

  app.listen(80);
}

如果您有一个主节点需要发布到其他Socket.IO进程,但本身不接受套接字连接,请使用socket.io-emitter而不是socket.io-redis
如果您遇到扩展问题,请使用DEBUG = *运行您的Node应用程序。 Socket.IO现在实现了debug,它还会打印出Redis适配器调试消息。示例输出:
socket.io:server initializing namespace / +0ms
socket.io:server creating engine.io instance with opts {"path":"/socket.io"} +2ms
socket.io:server attaching client serving req handler +2ms
socket.io-parser encoding packet {"type":2,"data":["event","payload"],"nsp":"/"} +0ms
socket.io-parser encoded {"type":2,"data":["event","payload"],"nsp":"/"} as 2["event","payload"] +1ms
socket.io-redis ignore same uid +0ms

如果您的主进程和子进程都显示相同的解析器消息,则表示您的应用程序已经正确地扩展。
如果您只从一个工作进程发射信号,那么您的设置应该没有问题。但是,您正在从所有四个工作进程发射信号,由于Redis发布/订阅机制,消息不会重复,但会被写入四次,因为您要求应用程序这样做。以下是Redis执行的简单示意图:
Client  <--  Worker 1 emit -->  Redis
Client  <--  Worker 2  <----------|
Client  <--  Worker 3  <----------|
Client  <--  Worker 4  <----------|

正如您所看到的,当您从工作进程发出时,它将向Redis发布该发射,并会从已订阅Redis数据库的其他工作进程进行镜像。这也意味着您可以使用连接到同一实例的多个套接字服务器,一个服务器上的发射将在所有连接的服务器上触发。
使用集群时,当客户端连接时,它将连接到您的四个工作进程之一,而不是全部四个。这也意味着您从该工作进程发出的任何内容只会显示一次给客户端。因此,是的,应用正在扩展,但是您的方式是从所有四个工作进程发出,而Redis数据库使其好像您在单个工作进程上调用了四次。如果客户端实际上连接到您的所有四个套接字实例,则每秒将接收到16条消息,而不是4条。
套接字处理类型取决于您要拥有的应用程序类型。如果您要单独处理客户端,则不应该有问题,因为连接事件仅对每个客户端的一个工作进程触发。如果您需要全局“心跳”,则可以在主进程中拥有套接字处理程序。由于工作进程在主进程死亡时死亡,因此您应该将连接负载偏移主进程,并让子进程处理连接。以下是一个示例:
var cluster = require('cluster');
var os = require('os');

if (cluster.isMaster) {
  // we create a HTTP server, but we do not use listen
  // that way, we have a socket.io server that doesn't accept connections
  var server = require('http').createServer();
  var io = require('socket.io').listen(server);

  var RedisStore = require('socket.io/lib/stores/redis');
  var redis = require('socket.io/node_modules/redis');

  io.set('store', new RedisStore({
    redisPub: redis.createClient(),
    redisSub: redis.createClient(),
    redisClient: redis.createClient()
  }));

  setInterval(function() {
    // all workers will receive this in Redis, and emit
    io.sockets.emit('data', 'payload');
  }, 1000);

  for (var i = 0; i < os.cpus().length; i++) {
    cluster.fork();
  }

  cluster.on('exit', function(worker, code, signal) {
    console.log('worker ' + worker.process.pid + ' died');
  }); 
}

if (cluster.isWorker) {
  var express = require('express');
  var app = express();

  var http = require('http');
  var server = http.createServer(app);
  var io = require('socket.io').listen(server);

  var RedisStore = require('socket.io/lib/stores/redis');
  var redis = require('socket.io/node_modules/redis');

  io.set('store', new RedisStore({
    redisPub: redis.createClient(),
    redisSub: redis.createClient(),
    redisClient: redis.createClient()
  }));

  io.sockets.on('connection', function(socket) {
    socket.emit('data', 'connected to worker: ' + cluster.worker.id);
  });

  app.listen(80);
}

在这个例子中,有五个 Socket.IO 实例,其中一个是主进程,另外四个是子进程。主服务器从不调用 listen(),因此该进程上没有连接开销。但是,如果您在主进程上调用 emit,则会将其发布到 Redis,并且四个工作进程将在其客户端上执行 emit。这将连接负载偏移给工作进程,如果一个工作进程死亡,则主进程中的主要应用程序逻辑将不受影响。
请注意,在 Redis 中,所有的 emit,即使在命名空间或房间中,也会被其他工作进程处理,就好像您从该进程触发了 emit 一样。换句话说,如果您有两个 Socket.IO 实例和一个 Redis 实例,在第一个工作进程中调用 emit() 将向其客户端发送数据,而第二个工作进程将执行与从该工作进程调用 emit 相同的操作。

好的答案。谢谢!它在某种程度上起作用了。当我从主服务器发出 io.sockets.emit('userstreamssock', postid); 时,工作节点没有接收到它。不确定为什么。 - Srikanth Jeeva
6
仅供参考:在socket.io > 1.0版本中无法使用,必须使用redis适配器。可参考链接http://socket.io/docs/using-multiple-nodes/ 我还没有成功地运行过带有集群和socket.io 1.1.0的示例。 - DerM
1
@DerM 我也是一样。运行 socket.io 1.3.5,我没有找到任何可行的方法。添加 Sticky Session,更改 HAProxy 配置……这些都不能让 socket 在集群中工作。 - RedGiant
我已经添加了一个Socket.IO 1.0+的示例,并在1.3.5上进行了测试。请注意,对于主节点,应该使用socket.io-emitter,因为它是一个非监听进程,但我省略了它以使答案更简单。 - hexacyanide
1
我在前端遇到了一个错误... socket.io.min.js:2 GET http://localhost:3000/socket.io/?EIO=3&transport=polling&t=LYqSrsK 404 (未找到) - cabs
显示剩余5条评论

2

让主进程处理您的心跳(如下例所示),或在内部启动多个进程并使用nginx进行负载均衡(nginx从V1.3版本开始支持websockets)。

主进程集群

// on the server
var express = require('express');
var server = express();
var socket = require('socket.io');
var io = socket.listen(server);
var cluster = require('cluster');
var numCPUs = require('os').cpus().length;

// socket.io
io.set('store', new socket.RedisStore);

// set-up connections...
io.sockets.on('connection', function(socket) {
    socket.on('join', function(rooms) {
        rooms.forEach(function(room) {
            socket.join(room);
        });
    });

    socket.on('leave', function(rooms) {
        rooms.forEach(function(room) {
            socket.leave(room);
        });
    });

});

if (cluster.isMaster) {
    // Fork workers.
    for (var i = 0; i < numCPUs; i++) {
        cluster.fork();
    }

    // Emit a message every second
    function send() {
        console.log('howdy');
        io.sockets.in('room').emit('data', 'howdy');
    }

    setInterval(send, 1000);


    cluster.on('exit', function(worker, code, signal) {
        console.log('worker ' + worker.process.pid + ' died');
    }); 
}

不是一个坏建议,但这样仍然只有一个主进程负责潜在的500,000个websocket连接...并不能真正解决跨多个服务器/每个服务器进程的可扩展性问题 - Lee Benson
这样怎么样:使用2层负载均衡器。 AWS示例: 第一层使用弹性负载均衡器在多台机器之间分配工作负载。第二层在机器上的多个实例之间分配工作负载。您可以运行cpu.count节点实例,并通过nginx将工作负载分配给它们,或者使用节点集群(在这种情况下不需要nginx)。我更喜欢nginx版本。对于自动缩放,请使用OpsWork并让它根据CPU负载处理您的缩放。它会自动添加和删除机器,并且设置非常容易。 - Taner Topal
当我使用 var socket = require('socket.io')(1338); 时,我会得到这个错误 Error: listen EADDRINUSE :::1338。如何在同一端口上实现? - Rizwan Patel

1
这实际上看起来是Socket.IO在扩展方面取得了成功。你期望从一个服务器发送的消息会发往该房间中的所有socket,无论他们连接到哪个服务器。
你最好的选择是有一个主进程每秒发送一条消息。例如,您可以通过只在cluster.isMaster运行来实现这一点。

它在“共享”套接字方面取得了成功,但无法确定哪些消息不应重复。集群是一个很好的想法,但这并不真正意味着“扩展”...它只是一个进程管理4个工作的过程。 - Lee Benson
@李,你希望它使用什么逻辑来决定是否“复制”消息?当你向一个房间发送消息时,它会发送给房间中的每个人 - 这是预期的行为。如果你想让每个进程都按时间间隔发送消息,你可以为每个进程创建一个房间。 - Aaron Dufour
我猜更好的逻辑是让socket.emit在进程间同步。不确定如何实现。当有10个不同的每个有4个核心的服务器时,“每个进程一个房间”的方法并不能解决可扩展性问题...但当只涉及一个服务器时,这可能是一个好主意。 - Lee Benson
@Lee Socket.IO通常的使用方式是,一个发生在一个服务器上的事件(例如http请求)会触发一个消息发送到一个房间。您希望这个消息发送给房间中的每个人,而不仅仅是连接到同一服务器的人。 “一个进程管理4个工作”-我不确定您的实际逻辑是什么,但每秒钟发送一条消息并不会很费力。 - Aaron Dufour
我的目标是真正弄清楚如何在规模上做到这一点。现在,对于例如10,000个客户端来说,这并不费力...但是当它达到一百万呢?我正在构建的应用程序具有大量的Web套接字连接,用于相当高需求的统计应用程序,API可以轻松地在短时间内达到每天10百万以上的套接字事务。我只是想准备好按需扩展 - 除了1个服务器,1个进程模型之外,仍然不确定如何做到这一点。 - Lee Benson
@Lee 如果没有更多关于你正在做什么的信息,那么帮助你会很困难。我猜想你的服务器不会每秒钟都对所有客户端进行ping测试吧? - Aaron Dufour

0

网页内容由stack overflow 提供, 点击上面的
可以查看英文原文,
原文链接