My problem is that I don't know how synchronise multiple threads using Ruby. The task is to create six threads and start them immediately. All of them should do some work (for example puts "Thread 1" Hi"
) one after another in the order I need it to work.
I've tried to work with Mutex, Monitor and Condition Variable, but all of them worked in random order. Could anybody explain how to achieve my goal?
After some time of struggling with Mutex and Condition Variable I've achieved my goal.
This code is a little bit messy, and I intentionally did't use cycles for "clearer view".
cv = ConditionVariable.new
mutex = Mutex.new
mutex2 = Mutex.new
cv2 = ConditionVariable.new
mutex3 = Mutex.new
cv3 = ConditionVariable.new
mutex4 = Mutex.new
cv4 = ConditionVariable.new
mutex5 = Mutex.new
cv5 = ConditionVariable.new
mutex6 = Mutex.new
cv6 = ConditionVariable.new
Thread.new do
mutex.synchronize {
puts 'First: Hi'
cv.wait(mutex)
puts 'First: Bye'
#cv.wait(mutex)
cv.signal
puts 'First: One more time'
}
end
Thread.new do
mutex.synchronize {
puts 'Second: Hi'
cv.signal
cv.wait(mutex)
puts 'Second:Bye'
cv.signal
}
mutex2.synchronize {
puts 'Second: Starting third'
cv2.signal
}
end
Thread.new do
mutex2.synchronize {
cv2.wait(mutex2)
puts 'Third: Hi'
}
mutex3.synchronize {
puts 'Third: Starting forth'
cv3.signal
}
end
Thread.new do
mutex3.synchronize {
cv3.wait(mutex3)
puts 'Forth: Hi'
}
mutex4.synchronize {
puts 'Forth: Starting fifth'
cv4.signal
}
end
Thread.new do
mutex4.synchronize {
cv4.wait(mutex4)
puts 'Fifth: Hi'
}
mutex5.synchronize {
puts 'Fifth: Starting sixth'
cv5.signal
}
end
Thread.new {
mutex5.synchronize {
cv5.wait(mutex5)
puts 'Sixth:Hi'
}
}
sleep 2
Using Queue as a PV Semaphore
You can abuse Queue, using it like a traditional PV Semaphore. To do this, you create an instance of Queue:
require 'thread'
...
sem = Queue.new
When a thread needs to wait, it calls Queue#deq:
# waiting thread
sem.deq
When some other thread wants to unblock the waiting thread, it pushes something (anything) onto the queue:
# another thread that wants to unblock the waiting thread
sem.enq :go
A Worker class
Here's a worker class that uses Queue to synchronize its start and stop:
class Worker
def initialize(worker_number)
@start = Queue.new
Thread.new do
@start.deq
puts "Thread #{worker_number}"
@when_done.call
end
end
def start
@start.enq :start
end
def when_done(&block)
@when_done = block
end
end
When constructed, a worker creates a thread, but that thread then waits on the @start
queue. Not until #start is called will the thread unblock.
When done, the thread will execute the block that was called to #when_done. We'll see how this is used in just a moment.
Creating workers
First, let's make sure that if any threads raise an exception, we get to find out about it:
Thread.abort_on_exception = true
We'll need six workers:
workers = (1..6).map { |i| Worker.new(i) }
Telling each worker what to do when it's done
Here's where #when_done comes into play:
workers.each_cons(2) do |w1, w2|
w1.when_done { w2.start }
end
This takes each pair of workers in turn. Each worker except the last is told, that when it finishes, it should start the worker after it. That just leaves the last worker. When it finishes, we want it to notify this thread:
all_done = Queue.new
workers.last.when_done { all_done.enq :done }
Let's Go!
Now all that remains is to start the first thread:
workers.first.start
and wait for the last thread to finish:
all_done.deq
The output:
Thread 1
Thread 2
Thread 3
Thread 4
Thread 5
Thread 6
If you're just getting started with threads, you might want to try something simple. Let the 1st thread sleep for 1 second, the 2nd for 2 seconds, the 3rd for 3 seconds and so on:
$stdout.sync = true
threads = []
(1..6).each do |i|
threads << Thread.new {
sleep i
puts "Hi from thread #{i}"
}
end
threads.each(&:join)
Output (takes 6 seconds because the threads run in parallel):
Hi from thread 1
Hi from thread 2
Hi from thread 3
Hi from thread 4
Hi from thread 5
Hi from thread 6
You can assign each a number, which will denote its place in the queue, and check it to see whose turn it is:
class QueuedWorker
def initialize(mutex, condition_variable, my_turn)
@mutex = mutex
@my_turn = my_turn
@condition_variable = condition_variable
end
def self.turn
@turn ||= 0
end
def self.done
@turn = turn + 1
end
def run
loop do
@mutex.synchronize do
if QueuedWorker.turn == @my_turn
# do actual work
QueuedWorker.done
@condition_variable.signal
return
end
@condition_variable.signal
@condition_variable.wait(@mutex)
end
end
end
end
mutex = Mutex.new
cv = ConditionVariable.new
(0..10).each do |i|
Thread.new do
QueueWorker.new(mutex, cv, i).run
end
end
That being said, the implementation is awkward, since threading are specifically not built for serial work. If you need something to work serially, do it in a single thread.