Thread running in Middleware is using old version

2019-05-04 23:21发布

问题:

I've used Heroku tutorial to implement websockets.

It works properly with Thin, but does not work with Unicorn and Puma.

Also there's an echo message implemented, which responds to client's message. It works properly on each server, so there are no problems with websockets implementation.

Redis setup is also correct (it catches all messages, and executes the code inside subscribe block).

How does it work now:

On server start, an empty @clients array is initialized. Then new Thread is started, which is listening to Redis and which is intended to send that message to corresponding user from @clients array.

On page load, new websocket connection is created, it is stored in @clients array.

If we receive the message from browser, we send it back to all clients connected with the same user (that part is working properly on both Thin and Puma).

If we receive the message from Redis, we also look up for all user's connections stored in @clients array. This is where weird thing happens:

  • If running with Thin, it finds connections in @clients array and sends the message to them.

  • If running with Puma/Unicorn, @clients array is always empty, even if we try it in that order (without page reload or anything):

    1. Send message from browser -> @clients.length is 1, message is delivered
    2. Send message via Redis -> @clients.length is 0, message is lost
    3. Send message from browser -> @clients.length is still 1, message is delivered

Could someone please clarify me what am I missing?

Related config of Puma server:

workers 1
threads_count = 1
threads threads_count, threads_count

Related middleware code:

require 'faye/websocket'

class NotificationsBackend

  def initialize(app)
    @app     = app
    @clients = []
    Thread.new do
      redis_sub = Redis.new
      redis_sub.subscribe(CHANNEL) do |on|
        on.message do |channel, msg|
          # logging @clients.length from here will always return 0
          # [..] retrieve user
          send_message(user.id, { message: "ECHO: #{event.data}"} )
        end
      end
    end
  end

  def call(env)
    if Faye::WebSocket.websocket?(env)
      ws = Faye::WebSocket.new(env, nil, {ping: KEEPALIVE_TIME })
      ws.on :open do |event|
        # [..] retrieve current user
        if user
          # add ws connection to @clients array
        else
          # close ws
        end
      end

      ws.on :message do |event|
        # [..] retrieve current user
        Redis.current.publish({user_id: user.id, { message: "ECHO: #{event.data}"}} )
      end

      ws.rack_response
    else
      @app.call(env)
    end
  end
  def send_message user_id, message
    # logging @clients.length here will always return correct result
    # cs = all connections which belong to that client
    cs.each { |c| c.send(message.to_json) }
  end
end

回答1:

Unicorn (and apparently puma) both start up a master process and then fork one or more workers. fork copies (or at least presents the illusion of copying - an actual copy usually only happens as you write to pages) your entire process but only the thread that called fork exists in the new process.

Clearly your app is being initialised before being forked - this is normally done so that workers can start quickly and benefit from copy on write memory savings. As a consequence your redis checking thread is only running in the master process whereas @clients is being modified in the child process.

You can probably work around this by either deferring the creation of your redis thread or disabling app preloading, however you should be aware that your setup will prevent you from scaling beyond a single worker process (which with puma and a thread friendly JVM like jruby would be less of a constraint)



回答2:

Just in case somebody will face the same problem, here are two solutions I have come up with:

1. Disable app preloading (this was the first solution I have come up with)

Simply remove preload_app! from the puma.rb file. Therefore, all threads will have their own @clients variable. And they will be accessible by other middleware methods (like call etc.)

Drawback: you will lose all benefits of app preloading. It is OK if you have only 1 or 2 workers with a couple of threads, but if you need a lot of them, then it's better to have app preloading. So I continued my research, and here is another solution:

2. Move thread initialization out of initialize method (this is what I use now)

For example, I moved it to call method, so this is how middleware class code looks like:

attr_accessor :subscriber

def call(env)
  @subscriber ||= Thread.new do # if no subscriber present, init new one
    redis_sub = Redis.new(url: ENV['REDISCLOUD_URL'])
    redis_sub.subscribe(CHANNEL) do |on|
      on.message do |_, msg|
        # parsing message code here, retrieve user
        send_message(user.id, { message: "ECHO: #{event.data}"} )
      end
    end
  end
  # other code from method
end

Both solutions solve the same problem: Redis-listening thread will be initialized for each Puma worker/thread, not for main process (which is actually not serving requests).