当通过socket.io重新连接时,Node.js + socket.io + node-amqp和队列绑定

3
我有一个场景与此示例非常接近:
一个主屏幕:
- 此屏幕(客户端)将通过server:9090/scope连接到socket.io服务器(io.connect("http://server:9090/scope)),并向socket.io服务器发送一个事件"userBindOk"(socket.emit("userBindOk", message)); - 服务器接收连接和"userBindOk"。此时,服务器应获取到rabbitmq服务器的活动连接,并将队列绑定到刚刚通过socket.io连接到应用程序的相应用户。示例: socket.on("connection", function(client){ //client id is 1234 // bind rabbitmq exchange, queue, and: queue.subscribe(//receive callback); }) - 到目前为止,没有问题 - 我可以通过socket.io无问题地发送/接收消息。
但是,如果我刷新页面,所有这些步骤都将再次执行。因此,绑定到队列将发生,但这次是与另一个socket.io客户端会话相关的。这意味着如果我向与第一个socket.io会话相关的队列发送消息(在页面刷新之前),那么该绑定应该(我认为)接收该消息并将其发送到无效的socket.io客户端(页面刷新=在socket.io上下文中的新client.id)。我可以证明这种行为,因为每次刷新页面时,我需要发送x倍的消息。例如:我第一次连接:-所以,1条消息-一个屏幕更新;刷新页面:我需要发送2条消息到队列,只有第二条消息将从“实际”的socket.io客户端会话中接收-每次刷新页面都会发生这种行为(20页刷新,20条消息发送到队列,服务器socket.io的“最后”客户端将向客户端socket.io发送消息以呈现在屏幕上)。
我认为解决方案是:
- 找到一种方法在与socket.io服务器断开连接时“取消绑定”队列-我还没有在node-amqp api中看到此选项(等待它:D) - 找到一种重连socket.io客户端并使用相同的client.id的方法。这样,我就可以识别即将到来的客户端并应用一些逻辑来缓存套接字。
有什么想法吗?我试图非常清楚...但是,正如您所知道的那样,在尝试澄清某个特定于某个上下文的问题时,很难表达您的问题...
1个回答

1

我是这样解决的:

我曾经将rabbitMq队列声明为durable=true,autoDelete=false,exclusive=false,在我的应用程序中有1个队列/用户和1个交换机(类型=direct),路由键名称=queueName,我的应用程序还将队列用于其他客户端,如安卓应用或iPhone应用作为推送回退,因此我为每个用户创建了一个队列。

解决此问题的方法是更改我的rabbitMQ队列和交换机声明。现在,我将exchange/user声明为fanout和autoDelete=True,并且用户将拥有N个队列,其中durable=true,autoDelete=true,exclusive=true(队列数=客户端数),并且所有队列都绑定到用户交换机(多播)。

注意:我的应用程序是使用django编写的,我使用node+socket+amqp来通过web.scokets与浏览器通信,因此我使用node-restler查询我的应用程序API以获取用户队列信息。

这就是rabbitMQ方面的解决方法,对于node+amqp+socket,我做了以下操作:

服务器端:

  • onConnect: 声明用户交换机为fanout、autoDelete、durable。然后声明队列为durable、autodelete和exclusive,然后将队列绑定到用户交换机,最后订阅队列并socket.disconnect将销毁队列,因此当客户端连接应用程序时,队列将存在,这解决了刷新问题,并允许用户在应用程序中拥有多个窗口选项卡:

服务器端:

            /*
             * unCaught exception handler
             */

            process.on('uncaughtException', function (err) {
                sys.p('Caught exception: ' + err);
                global.connection.end();
            });


            /*
             * Requiere libraries
             */

            global.sys =  require('sys');
            global.amqp = require('amqp');
            var rest = require('restler');
            var io = require('socket.io').listen(8080);

            /*
             * Module global variables
             */
            global.amqpReady = 0;


            /*
             * RabbitMQ connection
             */

            global.connection = global.amqp.createConnection({
                             host: host,
                             login: adminuser,
                             password: adminpassword,
                             vhost: vhost
                            });

            global.connection.addListener('ready', 
                        function () {
                            sys.p("RabbitMQ connection stablished");
                            global.amqpReady = 1;
                        }
            );


            /*
             * Web-Socket declaration
             */ 

            io.sockets.on('connection', function (socket) {
                socket.on('message', function (data) {
                    sys.p(data);
                    try{
                        var message = JSON.parse(data);                 
                    }catch(error){
                        socket.emit("message", JSON.stringify({"error": "invalid_params", "code": 400}));
                        var message = {};
                    }           
                    var message = JSON.parse(data);
                    if(message.token != undefined) {

                      rest.get("http://dev.kinkajougames.com/api/push",
                                {headers: 
                                    {
                                        "x-geochat-auth-token": message.token 
                                    }
                                }).on('complete', 
                                    function(data) {
                                        a = data;
                                }).on('success',
                                    function (data){
                                        sys.p(data);
                                        try{                                
                                            sys.p("---- creating exchange");
                                            socket.exchange = global.connection.exchange(data.data.bind, {type: 'fanout', durable: true, autoDelete: true});
                                            sys.p("---- declarando queue");
                                            socket.q = global.connection.queue(data.data.queue, {durable: true, autoDelete: true, exclusive: false},
                                                function (){
                                                    sys.p("---- bind queue to exchange");
                                                    //socket.q.bind(socket.exchange, "*");
                                                    socket.q.bind(socket.exchange, "*");
                                                    sys.p("---- subscribing queue exchange");
                                                    socket.q.subscribe(function (message) {
                                                        socket.emit("message", message.data.toString());
                                                    });     
                                                }
                                            );
                                        }catch(err){
                                            sys.p("Imposible to connection to rabbitMQ-server");
                                        }                                   

                                }).on('error', function (data){
                                    a = {
                                        data: data,
                                    };
                                }).on('400', function() {
                                    socket.emit("message", JSON.stringify({"error": "connection_error", "code": 400}));
                                }).on('401', function() {
                                    socket.emit("message", JSON.stringify({"error": "invalid_token", "code": 401}));
                                });               
                    }
                    else {
                      socket.emit("message", JSON.stringify({"error": "invalid_token", "code": 401}));
                    }

                });
                socket.on('disconnect', function () {
                    socket.q.destroy(); 
                    sys.p("closing socket");
                });
            });

客户端:

  • 使用选项'force new connection'=true和'sync disconnect on unload'=false的套接字实例。
  • 客户端使用onbeforeunload和onunload窗口对象事件发送socket.disconnect
  • 客户端在socket.connect事件中向节点发送用户令牌。
  • 从套接字处理消息

            var socket;
            function webSocket(){
                //var socket = new io.Socket();
                socket = io.connect("ws.dev.kinkajougames.com", {'force new connection':true, 'sync disconnect on unload': false});
                //socket.connect();
    onSocketConnect = function(){ alert('已连接'); socket.send(JSON.stringify({ token: Get_Cookie('liveScoopToken') })); };
    socket.on('connect', onSocketConnect); socket.on('message', function(data){ message = JSON.parse(data); if (message.action == "chat") { if (idList[message.data.sender] != undefined) { chatboxManager.dispatch(message.data.sender, { first_name: message.data.sender }, message.data.message); } else { var username = message.data.sender; Data.Collections.Chats.add({ id: username, title: username, user: username, desc: "Chat", first_name: username, last_name: "" }); idList[message.data.sender] = message.data.sender; chatboxManager.addBox(message.data.sender, { title: username, user: username, desc: "Chat", first_name: username, last_name: "", boxClosed: function(id){ alert("closing"); } }); chatboxManager.dispatch(message.data.sender

    就是这样,不再循环发送消息了。


网页内容由stack overflow 提供, 点击上面的
可以查看英文原文,
原文链接