Sidekiq: Ensure all jobs on the queue are unique

2019-01-22 04:32发布

问题:

I have some update triggers which push jobs onto the Sidekiq queue. So in some cases, there can be multiple jobs to process the same object.

There are a couple of uniqueness plugins ("Middleware", Unique Jobs), they're not documented much, but they seem to be more like throttlers to prevent repeat processing; what I want is a throttler that prevents repeat creating of the same jobs. That way, an object will always be processed in its freshest state. Is there a plugin or technique for this?


Update: I didn't have time to make a middleware, but I ended up with a related cleanup function to ensure queues are unique: https://gist.github.com/mahemoff/bf419c568c525f0af903

回答1:

My suggestion is to search for prior scheduled jobs based on some select criteria and delete, before scheduling a new one. This has been useful for me when i want a single scheduled job for a particular Object, and/or one of its methods.

Some example methods in this context:

 find_jobs_for_object_by_method(klass, method)

  jobs = Sidekiq::ScheduledSet.new

  jobs.select { |job|
    job.klass == 'Sidekiq::Extensions::DelayedClass' &&
        ((job_klass, job_method, args) = YAML.load(job.args[0])) &&
        job_klass == klass &&
        job_method == method
  }

end

##
# delete job(s) specific to a particular class,method,particular record
# will only remove djs on an object for that method
#
def self.delete_jobs_for_object_by_method(klass, method, id)

  jobs = Sidekiq::ScheduledSet.new
  jobs.select do |job|
    job.klass == 'Sidekiq::Extensions::DelayedClass' &&
        ((job_klass, job_method, args) = YAML.load(job.args[0])) &&
        job_klass == klass &&
        job_method == method  &&
        args[0] == id
  end.map(&:delete)

end

##
# delete job(s) specific to a particular class and particular record
# will remove any djs on that Object
#
def self.delete_jobs_for_object(klass, id)

  jobs = Sidekiq::ScheduledSet.new
  jobs.select do |job|
    job.klass == 'Sidekiq::Extensions::DelayedClass' &&
        ((job_klass, job_method, args) = YAML.load(job.args[0])) &&
        job_klass == klass &&
        args[0] == id
  end.map(&:delete)

end


回答2:

What about a simple client middleware?

module Sidekiq
  class UniqueMiddleware

    def call(worker_class, msg, queue_name, redis_pool)
      if msg["unique"]
        queue = Sidekiq::Queue.new(queue_name)
        queue.each do |job|
          if job.klass == msg['class'] && job.args == msg['args']
            return false
          end
        end
      end

      yield

    end
  end
end

Just register it

  Sidekiq.configure_client do |config|
    config.client_middleware do |chain|
      chain.add Sidekiq::UniqueMiddleware
    end
  end

Then in your job just set unique: true in sidekiq_options when needed



回答3:

Take a look at this: https://github.com/mhenrixon/sidekiq-unique-jobs

It's sidekiq with unique jobs added



回答4:

Maybe you could use Queue Classic which enqueues jobs on a Postgres database (in a really open way), so it could be extended (open-source) to check for uniqueness before doing so.