如何在使用Sinatra服务器发送事件流时提高并发性能

3
我正在构建一个Rack中间件,订阅Redis频道,并使用服务器发送事件将消息推送到客户端。Sinatra为此提供了一个不错的DSL。我有一个工作示例,但是当我达到7或8个客户端时,性能会显著下降。我在尝试重用请求之间的Redis连接时也遇到了“死锁”服务器的问题。
我正在使用Thin来提供应用程序服务(其底层使用EventMachine)。我认为Sinatra DSL已经处理了事件机制的并发性,但也许这是我需要自己实现的东西?我不想限制自己只使用基于EventMachine的服务器(如Thin、Rainbows!),以防某人想使用多线程服务器,例如Puma。我该怎么做才能增加代码的并发性?
require 'redis'
require 'sinatra/base'

class SSE < Sinatra::Base

  def send_message(json)
    "id: #{Time.now}\n" +
    "data: #{json}" +
    "\r\n\n"
  end

  get '/channels/:id/subscribe', provides: 'text/event-stream' do
    channel_id = params['id']
    stream(:keep_open) do |connection|
      Redis.new.subscribe("channels:#{channel_id}") do |on|
        on.message do |channel, json|
          connection << send_message(json)
        end
      end
    end
  end

end
2个回答

1

我有几个想法,所以我将按无部分顺序列出它们。

我正在使用Thin来服务于应用程序(其在幕后使用EventMachine)。我认为Sinatra DSL已经处理了与EventMachine的并发,但也许这是我需要自己实现的东西?

你是对的,Thin使用EventMachine。然而,与EventMachine(或任何其他反应器)有关的问题在于,一旦执行同步操作,你就会停止整个反应器。因此,要真正获得你期望的并发性,你需要在整个应用程序中继续使用EventMachine。

查看em-hiredis,这是一个支持发布/订阅的EventMachine准备好的Redis客户端。

我不想局限于只使用基于EventMachine的服务器(如Thin、Rainbows!),以防有人想使用多线程服务器如Puma

我从未尝试过我即将说的话,但我认为你在不使用EventMachine的服务器中使用EventMachine不会有问题。只需记住启动自己的EM即可。也许在config.ru中?

我也遇到了在试图重用Redis连接时“死锁”服务器的问题。
我认为你经历这个问题的原因是每次调用'/channels/:id/subscribe'都会打开一个新的Redis连接。你只能打开有限数量的连接。考虑将 Redis.new 重构为应用程序的共享连接。仅打开一次。单个Redis连接应该能够处理多个发布/订阅。
这只是一些想法,希望它们有所帮助。

1
经过大量的研究和实验,这是我在sinatra + sinatra sse gem中使用的代码:
class EventServer < Sinatra::Base
 include Sinatra::SSE
 set :connections, []
 .
 .
 .
 get '/channel/:channel' do
 .
 .
 .
  sse_stream do |out|
    settings.connections << out
    out.callback {
      puts 'Client disconnected from sse';
      settings.connections.delete(out);
    }
  redis.subscribe(channel) do |on|
      on.subscribe do |channel, subscriptions|
        puts "Subscribed to redis ##{channel}\n"
      end
      on.message do |channel, message|
        puts "Message from redis ##{channel}: #{message}\n"
        message = JSON.parse(message)
        .
        .
        .
        if settings.connections.include?(out)
          out.push(message)
        else
          puts 'closing orphaned redis connection'
          redis.unsubscribe
        end
      end
    end
  end
end

Redis连接在.on.message上阻塞,并且仅接受(p)subscribe/(p)unsubscribe命令。一旦取消订阅,Redis连接将不再被阻塞,并且可以由最初的SSE请求实例化的Web服务器对象释放。当您在Redis上收到消息并且与浏览器的SSE连接不再存在于集合数组中时,它会自动清除。

谢谢你提供这个好的例子,唯一让我困扰了很久的是只有第一个连接按预期工作。经过研究,我发现应该在每个请求中调用Redis.new,否则只有第一个会被处理。 - Max Prokopov

网页内容由stack overflow 提供, 点击上面的
可以查看英文原文,
原文链接