-->

Rails 4.2 get delayed job id from active job

2019-02-16 22:17发布

问题:

Any idea how to get the Delayed::Job id from the ActiveJob enqueuing? When I enqueue a job I get back an instance of ActiveJob::Base with a @job_id, but that job id seems to be internal to ActiveJob. My best guess so far is just to walk down the most recently created jobs:

active_job_id = GenerateReportJob.perform_later(self.id).job_id
delayed_job = Delayed::Job.order(id: :desc).limit(5).detect do |job|
  YAML.load(job.handler).job_data['job_id'] == active_job_id
end

but that seems all kinds of hacky. Kind of surprised ActiveJob isn't returning the ID from Delayed::Job, especially since that is what is explicitly returned when the job gets enqueued.

== EDIT

Looks like I'm not the only one (https://github.com/rails/rails/issues/18821)

回答1:

In case anyone finds this in the future: Rails just accepted a patch to allow you to get this id from provider_job_id in Rails 5. You can get it to work with a patch like

ActiveJob::QueueAdapters::DelayedJobAdapter.singleton_class.prepend(Module.new do
  def enqueue(job)
    provider_job = super
    job.provider_job_id = provider_job.id
    provider_job
  end

  def enqueue_at(job, timestamp)
    provider_job = super
    job.provider_job_id = provider_job.id
    provider_job
  end
end)


回答2:

Inspired by the answer of Beguene and some reverse engineering of the Rails 5 ActiveJob code, I have made it work with Rails 4.2 by

1) adding following code in lib/active_job/queue_adapters/delayed_job_adapter.rb or config/initializers/delayed_job.rb (both locations have worked):

# file: lib/active_job/queue_adapters/delayed_job_adapter.rb
module ActiveJob
  module Core
    # ID optionally provided by adapter
    attr_accessor :provider_job_id
  end

  module QueueAdapters
    class DelayedJobAdapter
      class << self
        def enqueue(job) #:nodoc:
          delayed_job = Delayed::Job.enqueue(JobWrapper.new(job.serialize), queue: job.queue_name)
          job.provider_job_id = delayed_job.id
          delayed_job
        end

        def enqueue_at(job, timestamp) #:nodoc:
          delayed_job = Delayed::Job.enqueue(JobWrapper.new(job.serialize), queue: job.queue_name, run_at: Time.at(timestamp))
          job.provider_job_id = delayed_job.id
          delayed_job
        end
      end
      class JobWrapper #:nodoc:
        attr_accessor :job_data

        def initialize(job_data)
          @job_data = job_data
        end

        def perform
          Base.execute(job_data)
        end
      end
    end
  end
end

The attr_accessor :provider_job_id statement is needed in Rails 4.2, since it is used in the enqueue method and is not yet defined in 4.2.

Then we can make use of it like follows:

2) define our own ActiveJob class:

# file: app/jobs/my_job.rb
class MyJob < ActiveJob::Base
  queue_as :default

  def perform(object, performmethod = method(:method))
    # Do something later
      returnvalue = object.send(performmethod)
      returnvalue
    end

  end
end

3) Now we can create a new job anywhere in the code:

job = MyJob.perform_later(Myobject, "mymethod")

This will put the method Myobject.mymethod into the queue.

4) The code in 1) helps us to find the Delayed Job that is associated with our job:

delayed_job = Delayed::Job.find(job.provider_job_id)

5) finally, we can do, whatever we need to do with the delayed_job, e.g. delete it:

delayed_job.delete

Note: in Rails 5, step 1) will not be needed anymore, since the exact same code is integral part of Rails 5.



回答3:

I made it work in Rails 4.2 using the new patch from Rails 5 like this:

create the file lib/active_job/queue_adapters/delayed_job_adapter.rb

module ActiveJob
  module QueueAdapters
    class DelayedJobAdapter
      class << self
        def enqueue(job) #:nodoc:
          delayed_job = Delayed::Job.enqueue(JobWrapper.new(job.serialize), queue: job.queue_name)
          job.provider_job_id = delayed_job.id
          delayed_job
        end

        def enqueue_at(job, timestamp) #:nodoc:
          delayed_job = Delayed::Job.enqueue(JobWrapper.new(job.serialize), queue: job.queue_name, run_at: Time.at(timestamp))
          job.provider_job_id = delayed_job.id
          delayed_job
        end
      end

      class JobWrapper #:nodoc:
        attr_accessor :job_data

        def initialize(job_data)
          @job_data = job_data
        end

        def perform
          Base.execute(job_data)
        end
      end
    end
  end
end


回答4:

Instead of removing the job from the queue if it is cancelled you could model a cancellation of the job itself.

Then, when you come to run the GenerateReportJob you can first check for a cancellation of the report. If there is one then you can destroy the cancellation record and drop out of the report generation. If there is no cancellation then you can carry on as normal.