I am interested in knowing what would be the best way to implement a thread based queue.
For example:
I have 10 actions which I want to execute with only 4 threads. I would like to create a queue with all the 10 actions placed linearly and start the first 4 action with 4 threads, once one of the thread is done executing, the next one will start etc - So at a time, the number of thread is either 4 or less than 4.
There is a Queue
class in thread
in the standard library. Using that you can do something like this:
require 'thread'
queue = Queue.new
threads = []
# add work to the queue
queue << work_unit
4.times do
threads << Thread.new do
# loop until there are no more things to do
until queue.empty?
# pop with the non-blocking flag set, this raises
# an exception if the queue is empty, in which case
# work_unit will be set to nil
work_unit = queue.pop(true) rescue nil
if work_unit
# do work
end
end
# when there is no more work, the thread will stop
end
end
# wait until all threads have completed processing
threads.each { |t| t.join }
The reason I pop with the non-blocking flag is that between the until queue.empty?
and the pop another thread may have pop'ed the queue, so unless the non-blocking flag is set we could get stuck at that line forever.
If you're using MRI, the default Ruby interpreter, bear in mind that threads will not be absolutely concurrent. If your work is CPU bound you may just as well run single threaded. If you have some operation that blocks on IO you may get some parallelism, but YMMV. Alternatively, you can use an interpreter that allows full concurrency, such as jRuby or Rubinius.
There area a few gems that implement this pattern for you; parallel, peach,and mine is called threach
(or jruby_threach
under jruby). It's a drop-in replacement for #each but allows you to specify how many threads to run with, using a SizedQueue underneath to keep things from spiraling out of control.
So...
(1..10).threach(4) {|i| do_my_work(i) }
Not pushing my own stuff; there are plenty of good implementations out there to make things easier.
If you're using JRuby, jruby_threach
is a much better implementation -- Java just offers a much richer set of threading primatives and data structures to use.
Executable descriptive example:
require 'thread'
p tasks = [
{:file => 'task1'},
{:file => 'task2'},
{:file => 'task3'},
{:file => 'task4'},
{:file => 'task5'}
]
tasks_queue = Queue.new
tasks.each {|task| tasks_queue << task}
# run workers
workers_count = 3
workers = []
workers_count.times do |n|
workers << Thread.new(n+1) do |my_n|
while (task = tasks_queue.shift(true) rescue nil) do
delay = rand(0)
sleep delay
task[:result] = "done by worker ##{my_n} (in #{delay})"
p task
end
end
end
# wait for all threads
workers.each(&:join)
# output results
puts "all done"
p tasks
You could use a thread pool. It's a fairly common pattern for this type of problem.
http://en.wikipedia.org/wiki/Thread_pool_pattern
Github seems to have a few implementations you could try out:
https://github.com/search?type=Everything&language=Ruby&q=thread+pool
Celluloid have a worker pool example that does this.
I use a gem called work_queue. Its really practic.
Example:
require 'work_queue'
wq = WorkQueue.new 4, 10
(1..10).each do |number|
wq.enqueue_b("Thread#{number}") do |thread_name|
puts "Hello from the #{thread_name}"
end
end
wq.join