socket.io客户端在长时间Node.js函数中自动断开连接。

5
我正在使用socket.io将我的应用程序的swift客户端与服务器进行通信。实际上,客户端在打开应用程序时加入套接字连接,并立即向Redis队列添加作业(需要几秒钟到15秒左右)。服务器会向客户端发送作业ID的响应。在处理此作业时,有时客户端会断开连接。似乎没有任何规律可言,因为断开连接的时间完全不一致,也不像是在函数的特定点发生断开连接。我以为可能是手动从客户端断开连接,因此在每次客户端断开连接之前设置了套接字发射器(当这些发射器被发送到服务器时,服务器会打印一些内容,告诉我断开连接来自哪里)。这表明断开连接是自动的,因为在结束套接字连接之前,客户端从未收到发射。这在Heroku上运行。这是我的代码:
//queue initialization
const queue = new Queue('queue', process.env.REDIS_URL)

//client pings this endpoint to get the job id in the queue
app.post('/process', async function(request, response) {
  let job = await queue({request: request.body});
  console.log("Logging job as " + job.id)
  response.json({ id: job.id });
});

queue.process(10, async (job) => { //10 is the max workers per job
    console.log("Started processing")
    const client = await pool.connect()
    let item = job.data.request
    let title = item.title
    let subtitle = item.subtitle
    let id = item.id
    io.to(id).emit("Processing1", ""); //added emissions like these because I thought maybe the socket was timing out, but this didn't help
    console.log("Processing1");

    try {
      await client.query('BEGIN')
        let geoData = await //promise of geocoding endpoint api function
        let lengthOfGeoData = geoData.context.length
        io.to(id).emit("Processing2", "");
        console.log("Processing2");
        var municipality = ""
        var area = ""
        var locality = ""
        var place = ""
        var district = ""
        var region = ""
        var country = ""
        //for loop to go through geoData and set the above values
      if (municipality != "") {
        console.log("Signing in from " + municipality + ", " + area);
      } else {
        console.log("Signing in from " + area)
      }
      await scrape(municipality, area, id);
      await client.query('COMMIT')
    } catch(err) {
      await client.query('ROLLBACK')
      console.log(err)
    }
    try {
      await client.query('BEGIN')
      const array = await //a function that queries a Postgres db for some rows, makes json objects out of them, and pushes to the 'array' variable
      var array2 = []
      for (a of array) {
        let difference = getDifference(title, subtitle, a.title, a.subtitle) //math function
        if (difference <= 10) {
          array.push(a)
        }
      }
      io.to(id).emit("Processing9", "");
      console.log("Processing9");
      await client.query('COMMIT')
    } catch(err) {
      await client.query('ROLLBACK')
      console.log("ERROR: Failed arrayHelperFunction")
      console.log(err)
    } finally {
      client.release()
      console.log("About to emit this ish to " + id) //should emit to socket here ideally to notify that the processing is done and results can be polled
      io.to(id).emit("finishedLoading", "")
      return array2;
    }
});

//when the client polls the queue after it's received the 'done' notifier from the server
app.post('/poll', async function(request, response) {
  console.log("Polling")
  let id = request.body.id
  const results = await queue(id);
  for (r of results.returnvalue) {
    console.log("Sending " + r.title);
  }
  response.send(results.returnvalue)
});

//scrape
async function scrape(municipality, area, id) {
  const client = await pool.connect();
  try {
    await client.query('BEGIN')
    var location = ""
    if (municipality != "") {
      location = municipality + ", " + area
    } else {
      location = area
    }
    let inDatabase = await client.query('SQL statement AS it_does_exist', [params]);
    io.to(id).emit("Processing3", "");
    console.log("Processing3");
    if (inDatabase.rows[0].it_does_exist == false) { 
      let query = "book clubs near " + location
      var terminationTime = new Date()
      terminationTime.setHours(terminationTime.getHours() + 4);
      let date = ("0" + terminationTime.getDate()).slice(-2);
      let month = ("0" + (terminationTime.getMonth() + 1)).slice(-2);
      let year = terminationTime.getFullYear();
      let hours = terminationTime.getHours();
      let minutes = terminationTime.getMinutes();
      let seconds = terminationTime.getSeconds();
      let timestamp = year + "-" + month + "-" + date + " " + hours + ":" + minutes + ":" + seconds

      try {
        await client.query(`SQL statement`, [params]);
      } catch(err) {
        console.log("FAILURE: scrape() at 1.")
        console.log(err)
      }

      var queryLocation = "New York,New York,United States" //default search origination is here
      var queryGLCode = "US"
      io.to(id).emit("Processing4", "");
      console.log("Processing4");
      try {
        await fetch('https://serpapi.com/locations.json?q='+municipality+'&limit=10', { method : "GET" })
          .then(res => res.json())
          .then((json) => {
            for (let index = 0; index < 10; index++) {
              let locationAPIName = json[index].canonical_name
              let locationAPICode = json[index].country_code
              let resultLatitude = json[index].gps[1];
              let resultLongitude = json[index].gps[0];
            }
          });
      } catch(err) {
        console.log("FAILURE: scrape() at 2.")
        console.log(err)
      }
      io.to(id).emit("Processing5", "");
      console.log("Processing5");
      try {
        await Promise.all([
          searchEvents({engine: "google_events", q: query, location: queryLocation, hl: "en", gl: queryGLCode}).then(data => async function(){
            try {
              await client.query('BEGIN');
              let results = data.events_results
              if (results != null) {
                console.log("first HAD results")
                for (result of results) {
                  var fixedAddress = result.address[0]
                  let address = fixedAddress + ", " + result.address[1]
                      
                  let title = result.title + address

                  var description = result.description

                  let geoData = await geocode(address); //mapbox geocode the address
                  let latitude = Number(geoData.center[0]);
                  let longitude = Number(geoData.center[1]);
                  
                    await client.query(`SQL statement`, [params]);
                  
                }
                io.to(id).emit("Processing6", "");
                console.log("Processing6");
              } else {
                console.log("first DID NOT have results")
              }
              console.log("FIRST BLOCK")
              await client.query('COMMIT');
            } catch(err) {
              console.log("Results[0] not found.")
              console.log(err)
              await client.query('ROLLBACK');
            }
          }()),

          searchEvents({engine: "google_events", q: query, location: queryLocation, hl: "en", gl: queryGLCode, start: "10"}).then(data => async function(){
            // same as the one above, just with an offset
          }()),

          searchEvents({engine: "google_events", q: query, location: queryLocation, hl: "en", gl: queryGLCode, start: "20"}).then(data => async function(){
            // same as the one above, but with a different offset
          }())
        ])
      } catch(err) {
        console.log("FAILURE: scrape() at 3.")
        console.log(err)
      }

    } else {
      console.log("Location already in the database.")
    }
    await client.query('COMMIT')
  } catch(err) {
    await client.query('ROLLBACK')
    console.log(err)
  } finally {
    client.release()
    return "Resolved";
  }
}

//Client establish socket connection
func establishConnection(_ completion: (() -> Void)? = nil) {
    let socketUrlString: String = appState.server
    self.manager = SocketManager(socketURL: URL(string: socketUrlString)!, config: [.log(false), .reconnects(true), .extraHeaders(["header": "customheader"])])
    self.socket = manager?.defaultSocket
    self.socket?.connect()
    self.socket?.once(clientEvent: .connect, callback: { (data, emitter) in
        if completion != nil{
            completion!()
        }
    })
  //other socket functions
}

//Client initial post request
func process() {
    let server = "serverstring" + "process"
    let title = "title"
    let subtitle = "subtitle"
    let package = BookPackage(title: title, subtitle: subtitle, id: mySocketID) //this is after the initial connection
    print("package is \(package)")
            
    guard let url  = URL(string: server) else { return }

    var urlRequest = URLRequest(url: url)
    
    urlRequest.addValue("application/json", forHTTPHeaderField: "Content-Type")
    urlRequest.addValue("application/json", forHTTPHeaderField: "Accept")
    
    urlRequest.httpMethod = "POST"
    
    guard let data = try? JSONEncoder().encode(package) else { return }
            
    urlRequest.httpBody = data

    let task = URLSession.shared.dataTask(with: urlRequest) {
        (data, response, error) in
        if let error = error {
            print(error)
            return
        }
        guard let data = data else { return }
        guard let dataString = String(data: data, encoding: String.Encoding.utf8) else { return }
        let jsonData = Data(dataString.utf8)
        var decodedJob: Job? = nil
        do {
            decodedJob = try JSONDecoder().decode(Job.self, from: jsonData) //Job is just a struct in the same form as the json object sent back from the server
        } catch {
            print(error.localizedDescription)
        }
        DispatchQueue.main.async {
            self.appState.pendingJob = decodedJob
        }
    }
    // start the task
    task.resume()
}

这个bug唯一的规律是用户断开连接前日志记录(附注:'reason of disconnect'和'DISCONNECTED USER'在socket.on('disconnect')事件触发时被调用):

https://istack.dev59.com/7fjuU.webp

https://istack.dev59.com/z5bmL.webp

https://istack.dev59.com/aHNt3.webp

https://istack.dev59.com/64WYI.webp


1
请提供提供提供队列类的模块链接。 - jfriend00
2
那么,客户端代码运行在iPhone上(因为它看起来像Swift代码)?也许iPhone断开了一个非活动的socket.io连接,因为它断言了电源管理问题?这似乎是一个客户端问题。我建议搜索iOS中与socket.io连接断开相关的问题,并查看您找到的无数命中是否看起来像您的情况。 - jfriend00
2
设置正确的环境变量将自动在服务器上启用调试。对于客户端,似乎必须设置一个localStorage值。这个想法是调试代码已经在socket.io中了,你只需要使用正确的设置来启用它。你可能需要使用正确的设置重新启动服务器才能生效。 - jfriend00
1
请问出现了什么具体的错误信息?在哪个部分的抓取函数中发生了这个断开连接的问题? - Gandalf The Grey
1
你的前端和后端都使用了哪个版本的socket.io?如果它们的版本不同,那可能会导致问题。同时也要检查一下你的Node版本。 - Gandalf The Grey
显示剩余23条评论
3个回答

1

你的问题的解决方案是在启动服务器时修改pingTimeout。

来自Socket.io:

服务器发送一个ping,如果客户端在pingTimeout毫秒内没有回复pong,则服务器认为连接已关闭。

同样,如果客户端在pingInterval + pingTimeout毫秒内没有收到服务器的ping,则客户端也认为连接已关闭。

const io = new Server(httpServer, {
  pingTimeout: 30000
});

抱歉,我已尝试此解决方案,但它不起作用(尝试将 pingTimeout 设置为 50000 毫秒)。运行 DEBUG 模式后,我可以看到断开连接的原因是“客户端关闭,原因为传输关闭”,如果这是问题的原因,则原因应为“ping 超时”。 - nickcoding2

1
你应该使用await来阻塞事件循环。客户端定期发送心跳信号(由pingTimeout定义)。如果服务器没有收到心跳,则会断开连接。你应该将这个过程隔离出来。要么找到一种方法在worker/后台进程中使用它,要么使用异步方式,此外,在服务器端增加pingTimeout可能会有所帮助。

我相信你迄今为止给出了最正确的答案(这也是我给你悬赏的原因),但对我来说,这不是解决方案。 - nickcoding2
@nickcoding2 我明白了,谢谢你。但是执行得怎么样?你能在代码执行开始时启动一个计时器吗? 你可以使用 console.time(label); console.timeEnd(); 此外,您可以在每次发送心跳时将其输出到控制台。 - siniradam

0

您可以将传输方式从默认值更改为

 const io = new Server(httpServer, {
  transports: ['polling', 'websocket'],
});

这可能会解决问题,否则您也可以尝试更改 upgradeTimeoutpingTimeout


网页内容由stack overflow 提供, 点击上面的
可以查看英文原文,
原文链接