Rails: How to listen to / pull from service or que

2019-04-28 07:03发布

问题:

Most Rails applications work in a way that they are waiting for requests comming from a client and then do their magic. But if I want to use a Rails application as part of a microservice architecture (for example) with some asychonious communication (Serivce A sends an event into a Kafka or RabbitMQ queue and Service B - my Rails app - is supposed to listen to this queue), how can I tune/start the Rails app to immediately listen to a queue and being triggered by event from there? (Meaning the initial trigger is not comming from a client, but from the App itself.)

Thanks for your advice!

回答1:

I just set up RabbitMQ messaging within my application and will be implementing for decoupled (multiple, distributed) applications in the next day or so. I found this article very helpful (and the RabbitMQ tutorials, too). All the code below is for RabbitMQ and assumes you have a RabbitMQ server up and running on your local machine.

Here's what I have so far - that's working for me:

  #Gemfile
  gem 'bunny'
  gem 'sneakers'

I have a Publisher that sends to the queue:

  # app/agents/messaging/publisher.rb
  module Messaging
    class Publisher
      class << self

        def publish(args)
          connection = Bunny.new
          connection.start
          channel = connection.create_channel
          queue_name = "#{args.keys.first.to_s.pluralize}_queue"
          queue = channel.queue(queue_name, durable: true)
          channel.default_exchange.publish(args[args.keys.first].to_json, :routing_key => queue.name)
          puts "in #{self}.#{__method__}, [x] Sent #{args}!"
          connection.close
        end

      end
    end
  end

Which I use like this:

  Messaging::Publisher.publish(event: {... event details...})

Then I have my 'listener':

  # app/agents/messaging/events_queue_receiver.rb
  require_dependency "#{Rails.root.join('app','agents','messaging','events_agent')}"

  module Messaging
    class EventsQueueReceiver
      include Sneakers::Worker
      from_queue :events_queue, env: nil

      def work(msg)
        logger.info msg
        response = Messaging::EventsAgent.distribute(JSON.parse(msg).with_indifferent_access)
        ack! if response[:success]
      end

    end
  end

The 'listener' sends the message to Messaging::EventsAgent.distribute, which is like this:

  # app/agents/messaging/events_agent.rb
 require_dependency  #{Rails.root.join('app','agents','fsm','state_assignment_agent')}"

  module Messaging
    class EventsAgent
      EVENT_HANDLERS = {
        enroll_in_program: ["FSM::StateAssignmentAgent"]
      }
      class << self

        def publish(event)
          Messaging::Publisher.publish(event: event)
        end

        def distribute(event)
          puts "in #{self}.#{__method__}, message"
          if event[:handler]
            puts "in #{self}.#{__method__}, event[:handler: #{event[:handler}"
            event[:handler].constantize.handle_event(event)
          else
            event_name = event[:event_name].to_sym
            EVENT_HANDLERS[event_name].each do |handler|
              event[:handler] = handler
              publish(event)
            end
          end
          return {success: true}
        end

      end
    end
  end

Following the instructions on Codetunes, I have:

  # Rakefile
  # Add your own tasks in files placed in lib/tasks ending in .rake,
  # for example lib/tasks/capistrano.rake, and they will automatically be available to Rake.

  require File.expand_path('../config/application', __FILE__)

  require 'sneakers/tasks'
  Rails.application.load_tasks

And:

  # app/config/sneakers.rb
  Sneakers.configure({})
  Sneakers.logger.level = Logger::INFO # the default DEBUG is too noisy

I open two console windows. In the first, I say (to get my listener running):

  $ WORKERS=Messaging::EventsQueueReceiver rake sneakers:run
  ... a bunch of start up info
  2016-03-18T14:16:42Z p-5877 t-14d03e INFO: Heartbeat interval used (in seconds): 2
  2016-03-18T14:16:42Z p-5899 t-14d03e INFO: Heartbeat interval used (in seconds): 2
  2016-03-18T14:16:42Z p-5922 t-14d03e INFO: Heartbeat interval used (in seconds): 2
  2016-03-18T14:16:42Z p-5944 t-14d03e INFO: Heartbeat interval used (in seconds): 2

In the second, I say:

  $ rails s --sandbox
  2.1.2 :001 > Messaging::Publisher.publish({:event=>{:event_name=>"enroll_in_program", :program_system_name=>"aha_chh", :person_id=>1}})
  in Messaging::Publisher.publish, [x] Sent {:event=>{:event_name=>"enroll_in_program", :program_system_name=>"aha_chh", :person_id=>1}}!
  => :closed 

Then, back in my first window, I see:

  2016-03-18T14:17:44Z p-5877 t-19nfxy INFO: {"event_name":"enroll_in_program","program_system_name":"aha_chh","person_id":1}
  in Messaging::EventsAgent.distribute, message
  in Messaging::EventsAgent.distribute, event[:handler]: FSM::StateAssignmentAgent

And in my RabbitMQ server, I see:

It's a pretty minimal setup and I'm sure I'll be learning a lot more in coming days.

Good luck!



回答2:

I'm afraid that for RabbitMQ at least you will need a client. RabbitMQ implements the AMQP protocol, as opposed to the HTTP protocol used by web servers. As Sergio mentioned above, Rails is a web framework, so it doesn't have AMQP support built into it. You'll have to use an AMQP client such as Bunny in order to subscribe to a Rabbit queue from within a Rails app.



回答3:

Lets say Service A is sending some events to Kafka queue, you can have a background process running with your Rails app which would lookup into the kafka queue and process those queued messages. For background process you can go for cron-job or sidekiq kind of things.



回答4:

Rails is a lot of things. Parts of it handle web requests. Other parts (ActiveRecord) don't care if you are a web request or a script or whatever. Rails itself does not even come with a production worthy web server, you use other gems (e.g., thin for plain old web browsers, or wash_out for incoming SOAP requests) for that. Rails only gives you the infrastructure/middleware to combine all the pieces regarding servers.

Unless your queue can call out to your application in some fashion of HTTP, for example in the form of SOAP requests, you'll need something that listens to your queueing system, whatever that may be, and translates new "tickets" on your queue into controller actions in your Rails world.