WebSocket传输的可靠性(在重新连接期间Socket.io数据丢失)

92

已使用技术

NodeJS, Socket.io

问题

假设有两个用户U1U2通过Socket.io连接到一个应用程序上,算法如下:

  1. U1完全失去Internet连接(例如关闭Internet)
  2. U2U1发送消息。
  3. U1暂时无法接收到消息,因为网络中断了
  4. 服务器通过心跳超时检测到U1的断开连接
  5. U1重新连接到socket.io
  6. U1从未收到来自U2的消息-我猜它在第4步丢失。

可能的解释

我认为我明白为什么会发生这种情况:

  • 在第4步中,服务器终止了socket实例和消息队列到U1的连接
  • 此外,在第5步中,U1服务器创建了新的连接(不是重复使用的),因此即使消息仍在排队中,先前的连接也已丢失。

需要帮助

如何防止这种数据丢失?我必须使用心跳,因为我不想让人们在应用程序中挂起。同时我必须仍然给予重新连接的可能性,因为当我部署新版本的应用程序时,我希望没有停机时间。

P.S. 我所说的“消息”不仅是我可以存储在数据库中的文本消息,而是有价值的系统消息,其传递必须得到保证,否则UI会出问题。


补充 1

我已经拥有用户账户系统。此外,我的应用程序已经很复杂了。添加离线/在线状态不会有所帮助,因为我已经有了这样的东西。问题是不同的。

请注意第2步。 在此步骤中,我们从技术上讲不能说明U1是否下线,他只会失去连接,比如因为网络不好而持续2秒钟。所以U2发送了一条消息,但由于他的Internet仍旧处于断开状态(第3步),所以U1无法接收。第4步是检测脱机用户的必要步骤,例如,超时时间为60秒。最终,另外10秒钟后,U1的Internet连接再次恢复,他重新连接到socket.io。但是来自U2的消息在空间中丢失了,因为在服务器上,U1是按超时断开连接的。

这就是问题所在,我希望实现100%的传递。


解决方法

  1. 通过随机emitID识别{}用户中的emit(emit名称和数据)。发送emit。
  2. 在客户端确认emit(使用emitID向服务器发送emit)
  3. 如果已确认-从由emitID标识的对象中删除
  4. 如果用户重新连接-检查{}是否包含此用户,并循环执行每个对象中的步骤1
  5. 当断开连接或/和连接时,根据需要刷新{}用户
// Server
const pendingEmits = {};

socket.on('reconnection', () => resendAllPendingLimits);
socket.on('confirm', (emitID) => { delete(pendingEmits[emitID]); });

// Client
socket.on('something', () => {
    socket.emit('confirm', emitID);
});

解决方案2(有点类似)

添加于2020年2月1日。

虽然这不是Websockets的真正解决方案,但某些人可能仍会发现它很方便。我们从Websockets迁移到了SSE+Ajax。 SSE允许您从客户端连接以保持持久的TCP连接并实时接收来自服务器的消息。要从客户端向服务器发送消息 - 只需使用Ajax即可。存在延迟和开销等缺点,但SSE保证可靠性,因为它是一个TCP连接。

由于我们使用Express,因此我们使用此库进行SSE https://github.com/dpskvn/express-sse,但您可以选择适合您的库。

IE和大多数Edge版本不支持SSE,因此您需要使用polyfill:https://github.com/Yaffle/EventSource


没错。但是socket.io只是一个传输协议。它本身无法保证一致和可靠的消息传递。你应该研究(并阅读)发布-订阅架构和消息队列。在实践中,您将使用像redis这样的持久性数据库来存储消息。 - user568109
那 pubsub 可以解决这个问题吗?如果您撰写了全面的答案并且解决方案有效,您将获得50点赏金。 - igorpavlov
9
这是一个组织得非常好的问题。 - Katie
1
谢谢。我必须说接受的答案对我有效。目前我使用建议的方案,并没有问题。 - igorpavlov
嗨,Igor!我是Node.js和Socket.io的新手。如果可能的话,你能展示一下你的代码吗? :) - Eazy
显示剩余3条评论
8个回答

120

其他答案和评论中已经有人提到了这一点,但根本问题在于Socket.IO只是一种传递机制,您不能仅依赖它进行可靠的传输。确切知道消息是否成功传递到客户端的唯一人是客户端本身。对于这种系统,我建议做出以下断言:

  1. 消息不直接发送给客户端,而是发送到服务器并存储在某种数据存储器中。
  2. 当客户端重新连接时,客户端负责询问“我错过了什么”,并查询数据存储器中的存储消息以更新其状态。
  3. 如果在接收方客户端连接时将消息发送到服务器,则该消息将实时发送到客户端。

当然,根据应用程序的需求,您可以调整其中的各个部分。例如,您可以使用Redis列表或排序集来存储消息,并在确定客户端已经获得最新消息后将其清除。


以下是一些示例:

正常路径

  • U1和U2都连接到系统。
  • U2向服务器发送一条消息,该消息应该由U1接收。
  • 服务器将消息存储在某种持久性存储器中,并使用某种时间戳或顺序ID标记它为U1的消息。
  • 服务器通过Socket.IO将消息发送给U1。
  • U1的客户端通过Socket.IO回调确认它已接收到该消息。
  • 服务器从数据存储器中删除持久性消息。

离线路径

  • U1失去互联网连接。
  • U2向服务器发送一条消息,该消息应该由U1接收。
  • 服务器将消息存储在某种持久性存储器中,并使用某种时间戳或顺序ID标记它为U1的消息。
  • 服务器通过Socket.IO将消息发送给U1。
  • 由于U1离线,其客户端不会确认收到消息。
  • 也许U2会发送更多消息给U1,它们都会以相同的方式存储在数据存储中。
  • 当U1重新连接时,它会向服务器请求:“我看到的最后一条消息是X / 我有状态X,我错过了什么。”
  • 服务器基于U1的请求,从数据存储中发送给U1它错过的所有消息。
  • U1的客户端确认收到消息后,服务器将从数据存储中删除这些消息。

  • 如果您绝对需要确保消息传递,则设计系统的方式非常重要,使得连接实际上并不重要,并且实时传递只是一个额外的好处;这几乎总是涉及某种类型的数据存储。正如用户568109在评论中提到的,有些消息系统抽象了所述消息的存储和传递,并且可能值得研究此类预构建解决方案。(您可能仍然需要自己编写Socket.IO集成。)

    如果您不想将消息存储在数据库中,则可以尝试将它们存储在本地数组中;服务器尝试将消息发送给U1,并将其存储在“待处理消息”列表中,直到U1的客户端确认收到它。如果客户端离线,则当它回来时,可以告诉服务器“嘿,我断开连接了,请发送我错过的所有内容”,然后服务器可以遍历这些消息。

    幸运的是,Socket.IO提供了一种机制,允许客户端“响应”看起来像本机JS回调的消息。以下是一些伪代码:

    // server
    pendingMessagesForSocket = [];
    
    function sendMessage(message) {
      pendingMessagesForSocket.push(message);
      socket.emit('message', message, function() {
        pendingMessagesForSocket.remove(message);
      }
    };
    
    socket.on('reconnection', function(lastKnownMessage) {
      // you may want to make sure you resend them in order, or one at a time, etc.
      for (message in pendingMessagesForSocket since lastKnownMessage) {
        socket.emit('message', message, function() {
          pendingMessagesForSocket.remove(message);
        }
      }
    });
    
    // client
    socket.on('connection', function() {
      if (previouslyConnected) {
        socket.emit('reconnection', lastKnownMessage);
      } else {
        // first connection; any further connections means we disconnected
        previouslyConnected = true;
      }
    });
    
    socket.on('message', function(data, callback) {
      // Do something with `data`
      lastKnownMessage = data;
      callback(); // confirm we received the message
    });
    

    这与上一个建议非常相似,只是没有持久化数据存储。


    您可能还对“事件溯源”(event sourcing)的概念感兴趣。


    3
    我一直等待最终的全面答复,并声明客户必须确认交货。似乎真的没有其他方式。 - igorpavlov
    1
    很高兴能够帮助!如果您有任何问题,请联系我 - Michelle Tilley
    这将在一对一聊天场景中起作用。在发送消息到多个用户的房间示例中会发生什么。广播/ socket.in不支持回调。那么我们如何处理这种情况?这是我的问题。(http://stackoverflow.com/questions/43186636/socket-io-broadcast-rooms-and-acknowledgement-function) - jit

    4

    米歇尔的回答基本上正确,但还有一些其他重要的事情需要考虑。 主要问题是: "我的应用程序中用户和套接字之间是否存在区别?" 另一种问法是“每个已登录用户是否可以同时拥有多个套接字连接?”

    在网络世界中,单个用户可能始终具有多个套接字连接,除非您特别设置了防止此类情况的功能。 最简单的例子就是如果一个用户打开了同一页面的两个选项卡。 在这些情况下,您不关心仅向人类用户发送一次消息/事件...您需要将其发送到该用户的每个套接字实例,以便每个选项卡都可以运行其回调以更新ui状态。 或许对某些应用程序来说这并不是一个问题,但我的直觉告诉我它对大多数应用程序来说都是一个问题。 如果这对您很重要,请继续阅读....

    为解决这个问题(假定您使用数据库作为持久性存储),需要三个表。

    1. 用户- 与真实人员1比1
    2. 客户端- 表示“选项卡”,可能只有单个连接到套接字服务器。(任何“用户”都可能有多个)
    3. 消息- 需要发送到客户端的消息(不是需要发送到用户或套接字的消息)

    如果您的应用程序不需要,可以选择省略用户表,但OP称他们有一个。

    还需要正确定义的另一件事是“什么是套接字连接?”,“何时创建套接字连接?”,“何时重用套接字连接?” Michelle的伪代码让人觉得可以重用套接字连接。 使用Socket.IO,它们无法重复使用。 我看到它引起了很多混乱。 在现实生活中,有真正的场景与Michelle的示例相符合。 但我想象这些情况很少见。 真正发生的是当套接字连接丢失时,该连接,ID等将永远不会被重复使用。 因此,特别标记为该套接字的任何消息都不会传递给任何人,因为当原始连接的客户端重新连接时,它们会获得全新的连接和新ID。 这意味着您需要做一些事情来跟踪客户端(而不是套接字或用户)跨多个套接字连接。

    因此,在基于Web的示例中,我建议采取以下步骤:

    • 当用户加载具有创建套接字连接潜力的客户端(通常是单个网页)时,请向与其用户ID关联的客户端数据库添加一行。
    • 当用户实际连接到套接字服务器时,将客户端ID与连接请求一起传递给服务器。
    • 服务器应验证用户是否允许连接,并检查客户端表中的客户端行是否可用于连接并相应地进行允许/拒绝。
    • 使用Socket.IO生成的套接字ID更新客户端行。
    • 发送与客户端ID相关联的消息表中的任何项目。在初始连接时不会有任何项目,但如果这是客户端尝试重新连接,则可能存在一些项目。
    • 每次需要向该套接字发送消息时,在与您生成的客户端ID(而不是套接字ID)链接的消息表中添加一行。
    • 尝试发出消息并监听具有确认的客户端。
    • 当您收到确认时,从消息表中删除该项。
    • 您可能希望在客户端上创建某些逻辑,以丢弃从服务器发送的重复消息,因为一些人指出这在技术上是可能的。
    • 然后,当客户端与套接字服务器断开连接(故意或通过错误),请勿删除客户端行,最多只清除套接字ID。这是因为同一客户端可能会尝试重新连接。
    • 当客户端尝试重新连接时,请发送与原始连接尝试一起发送的相同客户端ID。服务器将像初始连接一样查看此操作。
    • 当客户端被销毁(用户关闭选项卡或导航到其他页面)时,这是您删除客户端行和所有属于该客户端的消息的时间。这一步可能有点棘手。

    由于最后一步有些棘手(至少以前是这样,我已经很久没做过这样的事情了),并且存在诸如断电等情况,客户端将在不清理客户端行的情况下断开连接,并且永远不会尝试使用该相同的客户端行进行重新连接 - 您可能希望定期运行某些东西以清除任何陈旧的客户端和消息行。或者,您可以永久存储所有客户端和消息,并仅标记它们的状态。

    因此,要明确,在一个用户有两个选项卡打开的情况下,您将向消息表中添加两条相同的消息,每条消息都标记了不同的客户端,因为您的服务器需要知道每个客户端是否都收到了它们,而不仅仅是每个用户。


    3

    如另一个回答中已经写过的那样,我也认为您应该将实时性作为奖励考虑:该系统应该能够在没有实时性的情况下正常工作。

    我正在为一家大型公司开发企业聊天(ios、android、Web前端、.net core + postGres后端)。在通过Socket UUID重新建立连接的方式(通过队列存储的未交付消息)后,我理解到有更好的解决方案:通过REST API同步。

    基本上,我最终只使用websocket进行实时通讯,并为每个实时消息(用户在线、打字者、聊天消息等)添加一个整数标签用于监视丢失消息。

    当客户端获取不是单调递增的ID时,它会意识到它已经失去同步,因此会丢弃所有socket消息并通过REST api请求所有观察者的重新同步。

    通过这种方式,我们可以处理离线期间应用程序状态的多种变化,而无需在重新连接时解析大量的websocket消息,并且我们确信已同步(因为最后一次同步日期仅由REST API设置,而不是来自socket)。

    唯一棘手的部分是从调用REST API到服务器响应之间监视实时消息,因为从数据库读取的内容需要时间才能返回到客户端,而在此期间可能会发生变化,因此需要缓存并考虑这些变化。

    我们将在几个月内进入生产阶段,希望到那时我可以好好睡觉 :)


    1
    似乎您已经拥有用户账户系统。您知道哪些账户是在线/离线的,您可以处理连接/断开事件:
    因此解决方案是,为每个用户在数据库中添加在线/离线和离线消息:
    chatApp.onLogin(function (user) {
       user.readOfflineMessage(function (msgs) {
           user.sendOfflineMessage(msgs, function (err) {
               if (!err) user.clearOfflineMessage();
           });
       })
    });
    
    chatApp.onMessage(function (fromUser, toUser, msg) {
       if (user.isOnline()) {
          toUser.sendMessage(msg, function (err) {
              // alert CAN NOT SEND, RETRY?
          });
       } else {
          toUser.addToOfflineQueue(msg);
       }
    })
    

    请阅读我问题中的“附加说明1”部分。我认为你的答案不是一个解决方案。 - igorpavlov
    很有趣,我现在开始自己的聊天项目,也许会用Web RTC :-> - damphat
    在WebRTC上也可以挖掘。但在这种情况下并不重要。啊...如果所有人都有稳定的互联网连接...当用户在Speedtest上拥有100Mbps,但实际上尝试Ping时却有20%的数据包丢失,我感到非常沮丧。谁需要这样的互联网呢?=) - igorpavlov

    0
    我认为你想要的是为每个用户创建一个可重用的套接字,类似于:
    客户端:
    socket.on("msg", function(){
        socket.send("msg-conf");
    });
    

    服务器:

    // Add this socket property to all users, with your existing user system
    user.socket = {
        messages:[],
        io:null
    }
    user.send = function(msg){ // Call this method to send a message
        if(this.socket.io){ // this.io will be set to null when dissconnected
            // Wait For Confirmation that message was sent.
            var hasconf = false;
            this.socket.io.on("msg-conf", function(data){
                // Expect the client to emit "msg-conf"
                hasconf = true;
            });
            // send the message
            this.socket.io.send("msg", msg); // if connected, call socket.io's send method
            setTimeout(function(){
                if(!hasconf){
                    this.socket = null; // If the client did not respond, mark them as offline.
                    this.socket.messages.push(msg); // Add it to the queue
                }
            }, 60 * 1000); // Make sure this is the same as your timeout.
    
        } else {
            this.socket.messages.push(msg); // Otherwise, it's offline. Add it to the message queue
        }
    }
    user.flush = function(){ // Call this when user comes back online
        for(var msg in this.socket.messages){ // For every message in the queue, send it.
            this.send(msg);
        }
    }
    // Make Sure this runs whenever the user gets logged in/comes online
    user.onconnect = function(socket){
        this.socket.io = socket; // Set the socket.io socket
        this.flush(); // Send all messages that are waiting
    }
    // Make sure this is called when the user disconnects/logs out
    user.disconnect = function(){
        self.socket.io = null; // Set the socket to null, so any messages are queued not send.
    }
    

    然后在断开连接之间保留套接字队列。

    确保将每个用户的socket属性保存到数据库中,并将这些方法作为用户原型的一部分。数据库不重要,只需按照您一直保存用户的方式进行保存即可。

    这将避免Additon 1中提到的问题,因为它要求客户在将消息标记为已发送之前确认。如果您真的想这样做,可以为每条消息分配一个ID,并让客户端将消息ID发送到msg-conf,然后进行检查。

    在此示例中,user是所有用户都从中复制或类似于用户原型的模板用户。

    注意:这尚未经过测试。


    其实,我认为你已经回答了我的问题。但是,能否给每段代码添加一些注释呢?我还不太明白如何将其集成到我的代码中。此外,应该将其保存在哪个数据库中以及您指的是哪种类型的数据库?Redis还是Mongo,或者无所谓? - igorpavlov
    它仍然没有解决问题。当消息发送时,服务器上的两个用户(发送者和接收者)都在线。请仔细阅读我的问题中的附加说明1。在这种情况下,“this.socket.io”将始终为“true”,因此消息正在发送但未被接收。您试图解决的问题是当发送者离线而不是接收者时。或者我错了吗? - igorpavlov
    1
    我相信@igorpavlov是对的。会有一段时间,客户端实际上已经断开了连接,但是服务器并不知道,因为心跳尚未发生。在此期间,“this.socket.io”将不是“null”,服务器将尝试传递消息。 - Michelle Tilley
    哦,那个还是不行。如果心跳为10秒,并且U1在心跳超时之前19秒向U2发送消息(在U2实际失去连接之前9秒),它仍然无法工作。而且它也会完全破坏“即时”体验。@Brandon-Tilley最终解决了这个问题,请看他的答案。 - igorpavlov
    @igorpavlov,太好了,另外请查看我的更新答案以获得可行的解决方案。 - Ari Porad
    显示剩余4条评论

    0

    很有趣,我找不到这个问题,但是谷歌了几个小时。我会看一下! - igorpavlov
    看起来我已经使用了这种架构。但它并没有解决我所描述的确切问题。 - igorpavlov

    -2

    最近一直在研究这些东西,认为可能有更好的方法。

    试着看看Azure Service Bus,队列和主题可以处理离线状态。消息等待用户回来,然后他们就能收到消息。

    运行队列需要成本,但基本队列每百万次操作只需0.05美元,因此从编写队列系统所需的工作时间来看,开发成本会更高。 https://azure.microsoft.com/en-us/pricing/details/service-bus/

    Azure Bus还提供了PHP、C#、Xarmin、Anjular、JavaScript等语言的库和示例。

    因此,服务器发送消息后无需担心跟踪它们。客户端也可以使用消息进行回传,以便在需要时处理负载平衡。


    这对我来说看起来像是产品推广。有人可能会觉得这很有帮助,但这甚至不是一项技术,而是一个整个的服务,而且还需要付费。 - igorpavlov

    -2

    尝试使用emit聊天列表

    io.on('connect', onConnect);
    
    function onConnect(socket){
    
      // sending to the client
      socket.emit('hello', 'can you hear me?', 1, 2, 'abc');
    
      // sending to all clients except sender
      socket.broadcast.emit('broadcast', 'hello friends!');
    
      // sending to all clients in 'game' room except sender
      socket.to('game').emit('nice game', "let's play a game");
    
      // sending to all clients in 'game1' and/or in 'game2' room, except sender
      socket.to('game1').to('game2').emit('nice game', "let's play a game (too)");
    
      // sending to all clients in 'game' room, including sender
      io.in('game').emit('big-announcement', 'the game will start soon');
    
      // sending to all clients in namespace 'myNamespace', including sender
      io.of('myNamespace').emit('bigger-announcement', 'the tournament will start soon');
    
      // sending to individual socketid (private message)
      socket.to(<socketid>).emit('hey', 'I just met you');
    
      // sending with acknowledgement
      socket.emit('question', 'do you think so?', function (answer) {});
    
      // sending without compression
      socket.compress(false).emit('uncompressed', "that's rough");
    
      // sending a message that might be dropped if the client is not ready to receive messages
      socket.volatile.emit('maybe', 'do you really need it?');
    
      // sending to all clients on this node (when using multiple nodes)
      io.local.emit('hi', 'my lovely babies');
    
    };


    网页内容由stack overflow 提供, 点击上面的
    可以查看英文原文,
    原文链接