Accessing a variable within a rails thread

2019-06-24 03:36发布

I'm building an application for a web-based slide show, where one 'master' user can move between slides and everyone's browsers follow along. To do this, I'm using websockets and Redis for a global channel to send messages through. Each client who connects has there info stored in an array, @clients. Then I have a separate thread for subscribing to the Redis channel, in which there is an 'on.message' block defined which should send a message to everyone in the @clients array, but that array is empty inside this block (not empty anywhere else in the module).

Pretty much following this example: https://devcenter.heroku.com/articles/ruby-websockets

The relevant code, which is in a custom middleware class:

require 'faye/websocket'
require 'redis'

class WsCommunication
  KEEPALIVE_TIME = 15 #seconds
  CHANNEL = 'vip-deck'

  def initialize(app)
    @app = app
    @clients = []

    uri = URI.parse(ENV['REDISCLOUD_URL'])
    Thread.new do
      redis_sub = Redis.new(host: uri.host, port: uri.port, password: uri.password)
      redis_sub.subscribe(CHANNEL) do |on|
        on.message do |channel, msg|
          puts @clients.count
          ### prints '0,' no clients receive msg
          @clients.each { |ws| ws.send(msg) }
        end
      end
    end
  end

  def call(env)
    if Faye::WebSocket.websocket?(env)
    ws = Faye::WebSocket.new(env, nil, {ping: KEEPALIVE_TIME})

    ws.on :open do |event|
      @clients << ws
      puts @clients.count
      ### prints actual number of clients
    end

    ws.on :message do |event|
      $redis.publish(CHANNEL, event.data)
    end

    ws.on :close do |event|
      @clients.delete(ws)
      ws = nil
    end

    ws.rack_response
  else
    @app.call(env)
  end
end
end

Is the @clients array empty when accessed inside the new thread because instance variables aren't shared across threads? and if so, how do I share a variable across threads?

I have also tried using $clients (global variable, should be accessible across threads), to no avail.

EDIT: man this site is so full of point-grabbers. no one answers anymore they just make super minor edits to get rep.

2条回答
【Aperson】
2楼-- · 2019-06-24 03:56

UPDATED EDIT AT END: Shows working code. Main module unmodified except for debugging code. Note: I did experience the issue I already noted regarding the need to unsubscribe prior to termination.

The code looks correct. I'd like to see how you are instantiating it.

In config/application.rb, you probably have at least something like:

require 'ws_communication'
config.middleware.use WsCommunication

Then, in your JavaScript client, you should have something like this:

var ws = new WebSocket(uri);

Do you instantiate another instance of WsCommunication? That would set @clients to an empty array and could exhibit your symptoms. Something like this would be incorrect:

var ws = new WsCommunication;

It would help us if you would show the client and, perhaps, config/application.rb if this post does not help.

By the way, I agree with the comment that @clients should be protected by a mutex on any update, if not reads as well. It's a dynamic structure that could change at any time in an event-driven system. redis-mutex is a good option. (Hope that link is correct as Github seems to be throwing 500 errors on everything at the moment.)

You might also note that $redis.publish returns an integer value of the number of clients that received the message.

Finally, you might find that you need to ensure that your channel is unsubscribed before termination. I've had situations where I've ended up sending each message multiple, even many, times because of earlier subscriptions to the same channel that weren't cleaned up. Since you are subscribing to the channel within a thread, you will need to unsubscribe within that same thread or the process will just "hang" waiting for the right thread to magically appear. I handle that situation by setting an "unsubscribe" flag and then sending a message. Then, within the on.message block, I test for the unsubscribe flag and issue the unsubscribe there.

The module you provided, with only minor debugging modifications:

require 'faye/websocket'
require 'redis'

class WsCommunication
  KEEPALIVE_TIME = 15 #seconds
  CHANNEL = 'vip-deck'

  def initialize(app)
    @app = app
    @clients = []
    uri = URI.parse(ENV['REDISCLOUD_URL'])
    $redis = Redis.new(host: uri.host, port: uri.port, password: uri.password)
    Thread.new do
      redis_sub = Redis.new(host: uri.host, port: uri.port, password: uri.password)
      redis_sub.subscribe(CHANNEL) do |on|
        on.message do |channel, msg|
          puts "Message event. Clients receiving:#{@clients.count};"
          @clients.each { |ws| ws.send(msg) }
        end
      end
    end
  end

  def call(env)
    if Faye::WebSocket.websocket?(env)
      ws = Faye::WebSocket.new(env, nil, {ping: KEEPALIVE_TIME})

      ws.on :open do |event|
        @clients << ws
        puts "Open event. Clients open:#{@clients.count};"
      end

      ws.on :message do |event|
        receivers = $redis.publish(CHANNEL, event.data)
        puts "Message published:#{event.data}; Receivers:#{receivers};"
      end

      ws.on :close do |event|
        @clients.delete(ws)
        puts "Close event. Clients open:#{@clients.count};"
        ws = nil
      end

      ws.rack_response
    else
      @app.call(env)
    end
  end
end

The test subscriber code I provided:

# encoding: UTF-8
puts "Starting client-subscriber.rb"
$:.unshift File.expand_path '../lib', File.dirname(__FILE__)
require 'rubygems'
require 'eventmachine'
require 'websocket-client-simple'

puts "websocket-client-simple v#{WebSocket::Client::Simple::VERSION}"

url = ARGV.shift || 'ws://localhost:3000'

EM.run do

  ws = WebSocket::Client::Simple.connect url

  ws.on :message do |msg|
    puts msg
  end

  ws.on :open do
    puts "-- Subscriber open (#{ws.url})"
  end

  ws.on :close do |e|
    puts "-- Subscriber close (#{e.inspect})"
    exit 1
  end

  ws.on :error do |e|
    puts "-- Subscriber error (#{e.inspect})"
  end

end

The test publisher code I provided. Publisher and Subscriber could easily be combined, as these are just tests:

# encoding: UTF-8
puts "Starting client-publisher.rb"
$:.unshift File.expand_path '../lib', File.dirname(__FILE__)
require 'rubygems'
require 'eventmachine'
require 'json'
require 'websocket-client-simple'

puts "websocket-client-simple v#{WebSocket::Client::Simple::VERSION}"

url = ARGV.shift || 'ws://localhost:3000'

EM.run do
  count ||= 0
  timer = EventMachine.add_periodic_timer(5+rand(5)) do
    count += 1
    send({"MESSAGE": "COUNT:#{count};"})
  end

  @ws = WebSocket::Client::Simple.connect url

  @ws.on :message do |msg|
    puts msg
  end

  @ws.on :open do
    puts "-- Publisher open"
  end

  @ws.on :close do |e|
    puts "-- Publisher close (#{e.inspect})"
    exit 1
  end

  @ws.on :error do |e|
    puts "-- Publisher error (#{e.inspect})"
    @ws.close
  end

  def self.send message
    payload = message.is_a?(Hash) ? message : {payload: message}
    @ws.send(payload.to_json)
  end
end

A sample config.ru which runs all this at the rack middleware layer:

require './controllers/main'
require './middlewares/ws_communication'
use WsCommunication
run Main.new

This is Main. I stripped it down out of my running version so it might need tweaked if you use it:

%w(rubygems bundler sinatra/base json erb).each { |m| require m }
ENV['RACK_ENV'] ||= 'development'
Bundler.require
$: << File.expand_path('../', __FILE__)
$: << File.expand_path('../lib', __FILE__)

Dir["./lib/*.rb", "./lib/**/*.rb"].each { |file| require file }
env = ENV['OS'] == 'Windows_NT' ? 'development' : ENV['RACK_ENV']

  class Main < Sinatra::Base

    env = ENV['OS'] == 'Windows_NT' ? 'development' : ENV['RACK_ENV']
    get "/" do
      erb :"index.html"
    end

    get "/assets/js/application.js" do
      content_type :js
      @scheme = env == "production" ? "wss://" : "ws://"
      erb :"application.js"
    end
  end
查看更多
神经病院院长
3楼-- · 2019-06-24 04:02

@client should be shared across all threads, are you sure that the client is not deleted by accident from the array? Try to puts "client deleted" in the ws.on :close block and test it. Also you could try to use a mutex where the @client variable is used in this way: http://ruby-doc.org/core-2.2.0/Mutex.html

查看更多
登录 后发表回答