使用Crystal/Kemal监听UDP数据包

4
我一直在尝试使用Crystal和Kemal创建一个非阻塞服务器,它可以(a)监听发送到它的UDP消息流,并且(b)将该消息转发给已经建立ws连接的任何浏览器的WebSocket。
到目前为止,我最好的解决方案是:
require "kemal"
require "socket"

server = UDPSocket.new
server.bind "localhost", 1234
puts "Started..."

ws "/" do |socket|

    udp_working = true

    while udp_working
        message, client_addr = server.receive
        socket.send message
    end

    socket.on_close do
        puts "Goodbye..."
        udp_working = false
    end
end

这一切似乎有点不太优雅,而且实际上并不能按预期工作,因为:
  • 在启动Crystal服务器和第一个Web浏览器连接到Crystal服务器之间发送的所有UDP数据包都被缓存,并以一个巨大的滞留事件发送
  • 不能正确处理从WebSockets断开连接的浏览器,即未触发socket.on_close,循环持续直至我终止Crystal服务器
我希望能够使用server.on_message类型处理方式,在接收到UDP数据包时才运行代码,而不是连续轮询阻塞服务器。是否有其他方法可以使用Crystal/Kemal来实现这一点?
谢谢!
2个回答

3
你的方法存在几个问题:
首先,因为这一行代码永远不会被执行,所以socket.on_close无法工作。while循环将一直运行,只要udp_working == true,而它只会在on_close钩子中设置为false
如果你不想让UDP数据包堆积,你需要从一开始就接收它们,并在没有websocket连接时进行处理(也许是丢弃?)。UDPServer没有on_message钩子,但receive已经是非阻塞的。因此,你可以在一个循环中运行它(在自己的纤程中),并在方法返回时采取行动。有关详细信息,请参见Crystal Concurrency;还有一个使用TCPSocket的示例,但在这方面,UDP应该是类似的。

3
Crystal会为您处理非阻塞,您需要在单独的fiber上编写阻塞代码并通过通道进行通信。Crystal将在后台使用非阻塞代码和select()调用。
此外,您需要一个框架或一些自己的代码来将接收到的消息复制到每个正在侦听的WebSocket。这通常称为发布/订阅或pub/sub。

ws "/" do |socket|

   udp_working = true

   while udp_working
       message, client_addr = server.receive
       socket.send message
   end

   socket.on_close do
       puts "Goodbye..."
       udp_working = false
   end
end
您需要先运行 socket.on_close,否则事件处理程序不会被添加,因此直到循环之后才会运行,但由于此原因循环实际上是无限的。此外,如果在检查 udp_working 变量和调用 #send 之间关闭套接字,则有小概率会出现 socket.send message 错误。

网页内容由stack overflow 提供, 点击上面的
可以查看英文原文,
原文链接