在Rails线程中访问变量

4
我正在构建一个基于Web的幻灯片应用程序,其中一个“主”用户可以在幻灯片之间移动,每个人的浏览器都会跟随。为此,我使用Websockets和Redis进行全局通道消息传递。每个连接的客户端都有其信息存储在一个数组@clients中。然后我有一个单独的线程用于订阅Redis频道,在其中定义了一个'on.message'块,应该向@clients数组中的每个人发送消息,但是该数组在此块内为空(在模块的任何其他地方不为空)。
基本上遵循这个示例:https://devcenter.heroku.com/articles/ruby-websockets 相关代码位于自定义中间件类中:
require 'faye/websocket'
require 'redis'

class WsCommunication
  KEEPALIVE_TIME = 15 #seconds
  CHANNEL = 'vip-deck'

  def initialize(app)
    @app = app
    @clients = []

    uri = URI.parse(ENV['REDISCLOUD_URL'])
    Thread.new do
      redis_sub = Redis.new(host: uri.host, port: uri.port, password: uri.password)
      redis_sub.subscribe(CHANNEL) do |on|
        on.message do |channel, msg|
          puts @clients.count
          ### prints '0,' no clients receive msg
          @clients.each { |ws| ws.send(msg) }
        end
      end
    end
  end

  def call(env)
    if Faye::WebSocket.websocket?(env)
    ws = Faye::WebSocket.new(env, nil, {ping: KEEPALIVE_TIME})
  
    ws.on :open do |event|
      @clients << ws
      puts @clients.count
      ### prints actual number of clients
    end

    ws.on :message do |event|
      $redis.publish(CHANNEL, event.data)
    end

    ws.on :close do |event|
      @clients.delete(ws)
      ws = nil
    end

    ws.rack_response
  else
    @app.call(env)
  end
end
end

当在新线程中访问@clients数组时,它是否为空是因为实例变量不能在线程之间共享?如果是这样,我该如何在线程之间共享变量?

我还尝试使用$clients(全局变量,应该可以在线程之间访问),但没有成功。


1
@kfrz 这是 Ruby,不是 Python;为什么要在单独的线程中执行这个操作?还有一个问题,为什么要手动执行这个操作?如果你使用 Rails 5.0,你可以使用 actioncable,它可以解决所有的问题。 - siegy22
看看这个是否有帮助:https://dev59.com/VXLYa4cB1Zd3GeqPTRRh - DhruvPathak
@RaVeN Redis的代码是阻塞的,它永远不会返回控制权。因此,线程对其操作是必要的。ActionCable是一个不错的解决方案,在所有客户端都是JavaScript的情况下可以使用。但是,这个解决方案更通用,应该能够处理支持WebSockets的任何客户端。 - Richard_G
2个回答

0

在所有线程中应该共享@client,您确定客户端没有意外从数组中删除吗?尝试在ws.on :close块中放置“client deleted”并进行测试。 此外,您可以尝试在使用@client变量的地方使用互斥锁,例如: http://ruby-doc.org/core-2.2.0/Mutex.html


0

最新编辑:在结尾处展示了可运行的代码。主模块未经修改,除了调试代码。注意:我已经遇到了我已经提到的需要在终止之前取消订阅的问题。

代码看起来正确。我想看看你是如何实例化它的。

在config/application.rb中,你可能至少有以下内容:

require 'ws_communication'
config.middleware.use WsCommunication

然后,在您的 JavaScript 客户端中,应该有类似以下的代码:

var ws = new WebSocket(uri);

你会实例化另一个 WsCommunication 实例吗?这样会将 @clients 设置为空数组,并且可能会出现你所描述的问题。下面这样的代码是错误的:
var ws = new WsCommunication;

如果这篇文章没有帮助到您,展示客户端和config/application.rb会对我们有所帮助。

顺便说一下,我同意评论中的观点,即在任何更新时都应该通过互斥锁来保护@clients,如果不是读取也是如此。它是一个动态结构,在事件驱动系统中随时可能发生变化。redis-mutex是一个不错的选择。(希望链接正确,因为Github似乎在目前的情况下会出现500错误。)

您还可以注意到$redis.publish返回接收消息的客户端数量的整数值。

最后,您可能需要确保在终止之前取消订阅您的频道。我曾经遇到过这样的情况,由于早期订阅了相同频道而没有清理,导致每条消息发送多次,甚至很多次。由于您正在线程中订阅频道,因此您需要在同一线程中取消订阅,否则进程将只是“挂起”,等待正确的线程神奇地出现。我通过设置“取消订阅”标志然后发送消息来处理这种情况。然后,在on.message块中,我测试取消订阅标志并在那里发布取消订阅。

您提供的模块,只需要进行一些小的调试修改:

require 'faye/websocket'
require 'redis'

class WsCommunication
  KEEPALIVE_TIME = 15 #seconds
  CHANNEL = 'vip-deck'

  def initialize(app)
    @app = app
    @clients = []
    uri = URI.parse(ENV['REDISCLOUD_URL'])
    $redis = Redis.new(host: uri.host, port: uri.port, password: uri.password)
    Thread.new do
      redis_sub = Redis.new(host: uri.host, port: uri.port, password: uri.password)
      redis_sub.subscribe(CHANNEL) do |on|
        on.message do |channel, msg|
          puts "Message event. Clients receiving:#{@clients.count};"
          @clients.each { |ws| ws.send(msg) }
        end
      end
    end
  end

  def call(env)
    if Faye::WebSocket.websocket?(env)
      ws = Faye::WebSocket.new(env, nil, {ping: KEEPALIVE_TIME})

      ws.on :open do |event|
        @clients << ws
        puts "Open event. Clients open:#{@clients.count};"
      end

      ws.on :message do |event|
        receivers = $redis.publish(CHANNEL, event.data)
        puts "Message published:#{event.data}; Receivers:#{receivers};"
      end

      ws.on :close do |event|
        @clients.delete(ws)
        puts "Close event. Clients open:#{@clients.count};"
        ws = nil
      end

      ws.rack_response
    else
      @app.call(env)
    end
  end
end

我提供的测试订阅者代码:
# encoding: UTF-8
puts "Starting client-subscriber.rb"
$:.unshift File.expand_path '../lib', File.dirname(__FILE__)
require 'rubygems'
require 'eventmachine'
require 'websocket-client-simple'

puts "websocket-client-simple v#{WebSocket::Client::Simple::VERSION}"

url = ARGV.shift || 'ws://localhost:3000'

EM.run do

  ws = WebSocket::Client::Simple.connect url

  ws.on :message do |msg|
    puts msg
  end

  ws.on :open do
    puts "-- Subscriber open (#{ws.url})"
  end

  ws.on :close do |e|
    puts "-- Subscriber close (#{e.inspect})"
    exit 1
  end

  ws.on :error do |e|
    puts "-- Subscriber error (#{e.inspect})"
  end

end

我提供了测试发布者代码。发布者和订阅者可以很容易地合并,因为这只是测试。
# encoding: UTF-8
puts "Starting client-publisher.rb"
$:.unshift File.expand_path '../lib', File.dirname(__FILE__)
require 'rubygems'
require 'eventmachine'
require 'json'
require 'websocket-client-simple'

puts "websocket-client-simple v#{WebSocket::Client::Simple::VERSION}"

url = ARGV.shift || 'ws://localhost:3000'

EM.run do
  count ||= 0
  timer = EventMachine.add_periodic_timer(5+rand(5)) do
    count += 1
    send({"MESSAGE": "COUNT:#{count};"})
  end

  @ws = WebSocket::Client::Simple.connect url

  @ws.on :message do |msg|
    puts msg
  end

  @ws.on :open do
    puts "-- Publisher open"
  end

  @ws.on :close do |e|
    puts "-- Publisher close (#{e.inspect})"
    exit 1
  end

  @ws.on :error do |e|
    puts "-- Publisher error (#{e.inspect})"
    @ws.close
  end

  def self.send message
    payload = message.is_a?(Hash) ? message : {payload: message}
    @ws.send(payload.to_json)
  end
end

一个在Rack中间件层运行所有这些的示例config.ru:
require './controllers/main'
require './middlewares/ws_communication'
use WsCommunication
run Main.new

这是主程序。我将其从我的运行版本中剥离出来,因此如果您使用它,可能需要进行微调:

%w(rubygems bundler sinatra/base json erb).each { |m| require m }
ENV['RACK_ENV'] ||= 'development'
Bundler.require
$: << File.expand_path('../', __FILE__)
$: << File.expand_path('../lib', __FILE__)

Dir["./lib/*.rb", "./lib/**/*.rb"].each { |file| require file }
env = ENV['OS'] == 'Windows_NT' ? 'development' : ENV['RACK_ENV']

  class Main < Sinatra::Base

    env = ENV['OS'] == 'Windows_NT' ? 'development' : ENV['RACK_ENV']
    get "/" do
      erb :"index.html"
    end

    get "/assets/js/application.js" do
      content_type :js
      @scheme = env == "production" ? "wss://" : "ws://"
      erb :"application.js"
    end
  end

嘿,感谢您提供的所有信息!我会将这个标记为答案,因为即使我的项目需求发生了变化,我也能以不同的方式完成所需的工作,这些信息非常有用。 - volx757

网页内容由stack overflow 提供, 点击上面的
可以查看英文原文,
原文链接