I have some update triggers which push jobs onto the Sidekiq queue. So in some cases, there can be multiple jobs to process the same object.
There are a couple of uniqueness plugins ("Middleware", Unique Jobs), they're not documented much, but they seem to be more like throttlers to prevent repeat processing; what I want is a throttler that prevents repeat creating of the same jobs. That way, an object will always be processed in its freshest state. Is there a plugin or technique for this?
Update: I didn't have time to make a middleware, but I ended up with a related cleanup function to ensure queues are unique: https://gist.github.com/mahemoff/bf419c568c525f0af903
My suggestion is to search for prior scheduled jobs based on some select criteria and delete, before scheduling a new one. This has been useful for me when i want a single scheduled job for a particular Object, and/or one of its methods.
Some example methods in this context:
find_jobs_for_object_by_method(klass, method)
jobs = Sidekiq::ScheduledSet.new
jobs.select { |job|
job.klass == 'Sidekiq::Extensions::DelayedClass' &&
((job_klass, job_method, args) = YAML.load(job.args[0])) &&
job_klass == klass &&
job_method == method
}
end
##
# delete job(s) specific to a particular class,method,particular record
# will only remove djs on an object for that method
#
def self.delete_jobs_for_object_by_method(klass, method, id)
jobs = Sidekiq::ScheduledSet.new
jobs.select do |job|
job.klass == 'Sidekiq::Extensions::DelayedClass' &&
((job_klass, job_method, args) = YAML.load(job.args[0])) &&
job_klass == klass &&
job_method == method &&
args[0] == id
end.map(&:delete)
end
##
# delete job(s) specific to a particular class and particular record
# will remove any djs on that Object
#
def self.delete_jobs_for_object(klass, id)
jobs = Sidekiq::ScheduledSet.new
jobs.select do |job|
job.klass == 'Sidekiq::Extensions::DelayedClass' &&
((job_klass, job_method, args) = YAML.load(job.args[0])) &&
job_klass == klass &&
args[0] == id
end.map(&:delete)
end
What about a simple client middleware?
module Sidekiq
class UniqueMiddleware
def call(worker_class, msg, queue_name, redis_pool)
if msg["unique"]
queue = Sidekiq::Queue.new(queue_name)
queue.each do |job|
if job.klass == msg['class'] && job.args == msg['args']
return false
end
end
end
yield
end
end
end
Just register it
Sidekiq.configure_client do |config|
config.client_middleware do |chain|
chain.add Sidekiq::UniqueMiddleware
end
end
Then in your job just set unique: true
in sidekiq_options when needed
Take a look at this: https://github.com/mhenrixon/sidekiq-unique-jobs
It's sidekiq with unique jobs added
Maybe you could use Queue Classic which enqueues jobs on a Postgres database (in a really open way), so it could be extended (open-source) to check for uniqueness before doing so.