可以将文章内容翻译成中文,广告屏蔽插件可能会导致该功能失效(如失效,请关闭广告屏蔽插件后再试):
问题:
What's the best way to implement a Hash that can be modified across multiple threads, but with the smallest number of locks. For the purposes of this question, you can assume that the Hash will be read-heavy. It must be thread-safe in all Ruby implementations, including ones that operate in a truly simultaneous fashion, such as JRuby, and it must be written in pure-Ruby (no C or Java allowed).
Feel free to submit a naïve solution that always locks, but that isn't likely to be the best solution. Points for elegance, but a smaller likelihood of locking wins over smaller code.
回答1:
Okay, now that you specified the actually meaning of 'threadsafe', here are two potential implementations. The following code will run forever in MRI and JRuby. The lockless implementation follows an eventual consistency model where each thread uses it's own view of the hash if the master is in flux. There is a little trickery required to make sure storing all the information in the thread doesn't leak memory, but that is handled and tested ― process size does not grow running this code. Both implementations would need more work to be 'complete', meaning delete, update, etc. would need some thinking, but either of the two concepts below will meet your requirements.
It's very important for people reading this thread to realize the whole issue is exclusive to JRuby ― in MRI the built-in Hash is sufficient.
module Cash
def Cash.new(*args, &block)
env = ENV['CASH_IMPL']
impl = env ? Cash.const_get(env) : LocklessImpl
klass = defined?(JRUBY_VERSION) ? impl : ::Hash
klass.new(*args)
end
class LocklessImpl
def initialize
@hash = {}
end
def thread_hash
thread = Thread.current
thread[:cash] ||= {}
hash = thread[:cash][thread_key]
if hash
hash
else
hash = thread[:cash][thread_key] = {}
ObjectSpace.define_finalizer(self){ thread[:cash].delete(thread_key) }
hash
end
end
def thread_key
[Thread.current.object_id, object_id]
end
def []=(key, val)
time = Time.now.to_f
tuple = [time, val]
@hash[key] = tuple
thread_hash[key] = tuple
val
end
def [](key)
# check the master value
#
val = @hash[key]
# someone else is either writing the key or it has never been set. we
# need to invalidate our own copy in either case
#
if val.nil?
thread_val = thread_hash.delete(key)
return(thread_val ? thread_val.last : nil)
end
# check our own thread local value
#
thread_val = thread_hash[key]
# in this case someone else has written a value that we have never seen so
# simply return it
#
if thread_val.nil?
return(val.last)
end
# in this case there is a master *and* a thread local value, if the master
# is newer juke our own cached copy
#
if val.first > thread_val.first
thread_hash.delete(key)
return val.last
else
return thread_val.last
end
end
end
class LockingImpl < ::Hash
require 'sync'
def initialize(*args, &block)
super
ensure
extend Sync_m
end
def sync(*args, &block)
sync_synchronize(*args, &block)
end
def [](key)
sync(:SH){ super }
end
def []=(key, val)
sync(:EX){ super }
end
end
end
if $0 == __FILE__
iteration = 0
loop do
n = 42
hash = Cash.new
threads =
Array.new(10) {
Thread.new do
Thread.current.abort_on_exception = true
n.times do |key|
hash[key] = key
raise "#{ key }=nil" if hash[key].nil?
end
end
}
threads.map{|thread| thread.join}
puts "THREADSAFE: #{ iteration += 1 }"
end
end
回答2:
Posting base/naive solution, just to boost my Stack Overflow cred:
require 'thread'
class ConcurrentHash < Hash
def initialize
super
@mutex = Mutex.new
end
def [](*args)
@mutex.synchronize { super }
end
def []=(*args)
@mutex.synchronize { super }
end
end
回答3:
Yehuda, I think you mentioned ivar setting was atomic? What about a simple copy and swap then?
require 'thread'
class ConcurrentHash
def initialize
@reader, @writer = {}, {}
@lock = Mutex.new
end
def [](key)
@reader[key]
end
def []=(key, value)
@lock.synchronize {
@writer[key] = value
@reader, @writer = @writer, @reader
@writer[key] = value
}
end
end
回答4:
This is a wrapper class around Hash that allows concurrent readers, but locks things down for all other types of access (including iterated reads).
class LockedHash
def initialize
@hash = Hash.new
@lock = ThreadAwareLock.new()
@reader_count = 0
end
def [](key)
@lock.lock_read
ret = @hash[key]
@lock.unlock_read
ret
end
def []=(key, value)
@lock.lock_write
@hash[key] = value
@lock.unlock_write
end
def method_missing(method_sym, *arguments, &block)
if @hash.respond_to? method_sym
@lock.lock_block
val = lambda{@hash.send(method_sym,*arguments, &block)}.call
@lock.unlock_block
return val
end
super
end
end
Here is the locking code it uses:
class RWLock
def initialize
@outer = Mutex.new
@inner = Mutex.new
@reader_count = 0
end
def lock_read
@outer.synchronize{@inner.synchronize{@reader_count += 1}}
end
def unlock_read
@inner.synchronize{@reader_count -= 1}
end
def lock_write
@outer.lock
while @reader_count > 0 ;end
end
def unlock_write
@outer.unlock
end
end
class ThreadAwareLock < RWLock
def initialize
@owner = nil
super
end
def lock_block
lock_write
@owner = Thread.current.object_id
end
def unlock_block
@owner = nil
unlock_write
end
def lock_read
super unless my_block?
end
def unlock_read
super unless my_block?
end
def lock_write
super unless my_block?
end
def unlock_write
super unless my_block?
end
def my_block?
@owner == Thread.current.object_id
end
end
The thread-aware lock is to allow you to lock the class once, and then call methods that would normally lock, and have them not lock. You need this because you yield into blocks inside some methods, and those blocks can call locking methods on the object, and you don't want a deadlock or a double-lock error. You could use a counting lock instead for this.
Here's an attempt to implement bucket-level read-write locks:
class SafeBucket
def initialize
@lock = RWLock.new()
@value_pairs = []
end
def get(key)
@lock.lock_read
pair = @value_pairs.select{|p| p[0] == key}
unless pair && pair.size > 0
@lock.unlock_read
return nil
end
ret = pair[0][1]
@lock.unlock_read
ret
end
def set(key, value)
@lock.lock_write
pair = @value_pairs.select{|p| p[0] == key}
if pair && pair.size > 0
pair[0][1] = value
@lock.unlock_write
return
end
@value_pairs.push [key, value]
@lock.unlock_write
value
end
def each
@value_pairs.each{|p| yield p[0],p[1]}
end
end
class MikeConcurrentHash
def initialize
@buckets = []
100.times {@buckets.push SafeBucket.new}
end
def [](key)
bucket(key).get(key)
end
def []=(key, value)
bucket(key).set(key, value)
end
def each
@buckets.each{|b| b.each{|key, value| yield key, value}}
end
def bucket(key)
@buckets[key.hash % 100]
end
end
I stopped working on this because it's too slow, so the each method is unsafe (allows mutations by other threads during an iteration) and it doesn't support most hash methods.
And here's a test harness for concurrent hashes:
require 'thread'
class HashHarness
Keys = [:a, :basic, :test, :harness, :for, :concurrent, :testing, :of, :hashes,
:that, :tries, :to, :provide, :a, :framework, :for, :designing, :a, :good, :ConcurrentHash,
:for, :all, :ruby, :implementations]
def self.go
h = new
r = h.writiness_range(20, 10000, 0, 0)
r.each{|k, v| p k + ' ' + v.map{|p| p[1]}.join(' ')}
return
end
def initialize(classes = [MikeConcurrentHash, JoshConcurrentHash, JoshConcurrentHash2, PaulConcurrentHash, LockedHash, Hash])
@classes = classes
end
def writiness_range(basic_threads, ops, each_threads, loops)
result = {}
@classes.each do |hash_class|
res = []
0.upto 10 do |i|
writiness = i.to_f / 10
res.push [writiness,test_one(hash_class, basic_threads, ops, each_threads, loops, writiness)]
end
result[hash_class.name] = res
end
result
end
def test_one(hash_class, basic_threads, ops, each_threads, loops, writiness)
time = Time.now
threads = []
hash = hash_class.new
populate_hash(hash)
begin
basic_threads.times do
threads.push Thread.new{run_basic_test(hash, writiness, ops)}
end
each_threads.times do
threads.push Thread.new{run_each_test(hash, writiness, loops)}
end
threads.each{|t| t.join}
rescue ThreadError => e
p [e.message, hash_class.name, basic_threads, ops, each_threads, loops, writiness].join(' ')
return -1
end
p [hash_class.name, basic_threads, ops, each_threads, loops, writiness, Time.now - time].join(' ')
return Time.now - time
end
def run_basic_test(hash, writiness, ops)
ops.times do
rand < writiness ? hash[choose_key]= rand : hash[choose_key]
end
end
def run_each_test(hash, writiness, loops)
loops.times do
hash.each do |k, v|
if rand < writiness
each_write_work(hash, k, v)
else
each_read_work(k, v)
end
end
end
end
def each_write_work(hash, key, value)
hash[key] = rand
end
def each_read_work(key, value)
key.to_s + ": " + value.to_s
end
def choose_key
Keys[rand(Keys.size)]
end
def populate_hash(hash)
Keys.each{|key| hash[key]=rand}
end
end
Numbers:
Jruby
Writiness 0.0 0.1 0.2 0.3 0.4 0.5 0.6 0.7 0.8 0.9 1.0
ConcurrentHash 2.098 3.179 2.971 3.083 2.731 2.941 2.564 2.480 2.369 1.862 1.881
LockedHash 1.873 1.896 2.085 2.058 2.001 2.055 1.904 1.921 1.873 1.841 1.630
Hash 0.530 0.672 0.685 0.822 0.719 0.877 0.901 0.931 0.942 0.950 1.001
And MRI
Writiness 0.0 0.1 0.2 0.3 0.4 0.5 0.6 0.7 0.8 0.9 1.0
ConcurrentHash 9.214 9.913 9.064 10.112 10.240 10.574 10.566 11.027 11.323 11.837 13.036
LockedHash 19.593 17.712 16.998 17.045 16.687 16.609 16.647 15.307 14.464 13.931 14.146
Hash 0.535 0.537 0.534 0.599 0.594 0.676 0.635 0.650 0.654 0.661 0.692
MRI numbers are pretty striking. Locking in MRI really sucks.
回答5:
This might be a use case for the hamster gem
Hamster implements Hash Array Mapped Tries (HAMT), as well as some other persistent data structures, in pure Ruby.
Persistent data structures are immutable, and instead of mutating (changing) the structure, such as by adding or replacing a key-value pair in a Hash, you instead return a new data structure which contains the change. The trick, with the persistent immutable data structures, is that the newly returned data structure re-uses as much of the predecessor as possible.
I think to implement using hamster, you would use their mutable hash wrapper, which passes all reads to the current value of the persistent immutable hash (ie, should be fast), while guarding all writes with a mutex, and swapping to the new value of the persistent immutable hash after the write.
For example:
require 'hamster'
require 'hamster/experimental/mutable_hash'
hsh = Hamster.mutable_hash(:name => "Simon", :gender => :male)
# reading goes directly to hash
puts hsh[:name] # Simon
# writing is actually swapping to new value of underlying persistent data structure
hsh.put(:name, "Joe")
puts hsh[:name] # Joe
So, let's use this for a similar type of problem to the one described:
(gist here)
require 'hamster'
require 'hamster/experimental/mutable_hash'
# a bunch of threads with a read/write ratio of 10:1
num_threads = 100
num_reads_per_write = 10
num_loops = 100
hsh = Hamster.mutable_hash
puts RUBY_DESCRIPTION
puts "#{num_threads} threads x #{num_loops} loops, #{num_reads_per_write}:1 R/W ratio"
t0 = Time.now
Thread.abort_on_exception = true
threads = (0...num_threads).map do |n|
Thread.new do
write_key = n % num_reads_per_write
read_keys = (0...num_reads_per_write).to_a.shuffle # random order
last_read = nil
num_loops.times do
read_keys.each do |k|
# Reads
last_read = hsh[k]
Thread.pass
# Atomic increments in the correct ratio to reads
hsh.put(k) { |v| (v || 0) + 1 } if k == write_key
end
end
end
end
threads.map { |t| t.join }
t1 = Time.now
puts "Error in keys" unless (0...num_reads_per_write).to_a == hsh.keys.sort.to_a
puts "Error in values" unless hsh.values.all? { |v| v == (num_loops * num_threads) / num_reads_per_write }
puts "Time elapsed: #{t1 - t0} s"
I'm getting the following outputs:
ruby 1.9.2p320 (2012-04-20 revision 35421) [x86_64-linux]
100 threads x 100 loops, 10:1 R/W ratio
Time elapsed: 5.763414627 s
jruby 1.7.0 (1.9.3p203) 2012-10-22 ff1ebbe on Java HotSpot(TM) 64-Bit Server VM 1.6.0_26-b03 [linux-amd64]
100 threads x 100 loops, 10:1 R/W ratio
Time elapsed: 1.697 s
What do you think of this?
This solution is more similar to how one might solve this in Scala or Clojure, although in those languages one would more likely be using software transactional memory with low-level CPU support for the atomic compare and swap operations which are implemented.
Edit: It's worth noting that one reason the hamster implementation is fast is that it features a lock-free read path. Please reply in comments if you have questions about that or how it works.
回答6:
this (video, pdf) is about lock-free hash table implemented in Java.
spoiler: uses atomic Compare-And-Swap (CAS) operations, if not available in Ruby you could emulate them with locks. not sure if that would give any advantage over simple lock-guarded hashtables
回答7:
Not tested, and a naive stab at optimizing for reads. It assumes that most of the time, the value won't be locked. If it is, the tight loop will try until it is. I put Thread.critical
in there to help ensure that the read threads won't be run until the write is completed. Not sure if the critical part is needed, it really depends on how read-heavy you mean, so some benchmarking is in order.
class ConcurrentHash < Hash
def initialize(*args)
@semaphore = Mutex.new
super
end
def []=(k,v)
begin
old_crit = Thread.critical
Thread.critical = true unless old_crit
@semaphore.synchronize { super }
ensure
Thread.critical = old_crit
end
end
def [](k)
while(true)
return super unless @semaphore.locked?
end
end
end
There may be a few other read methods that would need to check the @semaphore lock, I don't know if everything else is implemented in terms of #[].
回答8:
i'm pretty unclear on what is meant by this. i think the simplest implementation is simply
Hash
that is to say the built-in ruby hash is threadsafe if by threadsafe you mean will not blow up if > 1 threads tries to access it. this code will run safely forever
n = 4242
hash = {}
loop do
a =
Thread.new do
n.times do
hash[:key] = :val
end
end
b =
Thread.new do
n.times do
hash.delete(:key)
end
end
c =
Thread.new do
n.times do
val = hash[:key]
raise val.inspect unless [nil, :val].include?(val)
end
end
a.join
b.join
c.join
p :THREADSAFE
end
i suspect by thread safe you really mean ACID - for instance a write like hash[:key]=:val followed by a read if has[:key] would return :val. but no amount of trickery with locking can provide that - the last in would always win. for example, say you have 42 thread all updating a threadsafe hash - which value should be read by the 43'rd?? surely by threasafe you don't mean some sort of total ordering on writes - therefore if 42 threads were actively writing the 'correct' value is any right? but ruby's built-in Hash works in just this way...
perhaps you mean something like
hash.each do ...
in one thread and
hash.delete(key)
would not interfere with one another? i can imagine wanting that to be threadsafe, but that's not even safe in a single thread with the MRI ruby (obviously you cannot modify a hash while iterating over it)
so can you be more specific about what you mean by 'threadsafe' ??
the only way to give ACID semantics would be a gross lock (sure this could be a method that took a block - but still an external lock).
ruby's thread scheduler isn't just going to schedule a thread smack in the middle of some arbitrary c function (like the built-in hash aref aset methods) so those are effectively threadsafe.
回答9:
Unfortunately I can't add a comment to Michael Sofaer answer where he introduces: class RWLock and class LockedHash with @reader_count etc. (don't have enough karma yet)
That solution does not work. It gives an error:
in `unlock': Attempt to unlock a mutex which is not locked (ThreadError)
Due to the logical bug: when it's time to unlock things then unlock happens 1 extra time (because of missing check my_block?(). Instead it unblocks it even if unblocking was not necessary "is my block") and so 2nd unlock on already unlocked mutes raises an exception. (I'll paste full code on how to reproduce this error at the end of this post).
Also Michael mentioned "the each method is unsafe (allows mutations by other threads during an iteration)" which was critical for me, so I end up with this simplified solution which works for all my use cases and it simply locks mutex on any call to any hash method when called from different thread (calls from the same thread, which owns the lock are not blocking to avoid deadlocks):
#
# This TrulyThreadSafeHash works!
#
# Note if one thread iterating the hash by #each method
# then the hash will be locked for all other threads (they will not be
# able to even read from it)
#
class TrulyThreadSafeHash
def initialize
@mutex = Mutex.new
@hash = Hash.new
end
def method_missing(method_sym, *arguments, &block)
if !@mutex.owned? # Returns true if this lock is currently held by current thread
# We're trying to lock only if mutex is not owned by the current thread (is not locked or is locked by some other thread).
# Following call will be blocking if mutex locked by other thread:
@mutex.synchronize{
return lambda{@hash.send(method_sym,*arguments, &block)}.call
}
end
# We already own the lock (from current thread perspective).
# We don't even check if @hash.respond_to?(method_sym), let's make Hash
# respond properly on all calls (including bad calls (example: wrong method names))
lambda{@hash.send(method_sym,*arguments, &block)}.call
end
# since we're tyring to mimic Hash we'll pretend to respond as Hash would
def self.respond_to?(method_sym, include_private = false)
Hash.respond_to(method_sym, include_private)
end
# override Object's to_s because our method_missing won't be called for to_s
def to_s(*arguments)
@mutex.synchronize{
return @hash.to_s
}
end
# And for those, who want to run extra mile:
# to make our class json-friendly we shoud require 'json' and uncomment this:
#def to_json(*options)
# @mutex.synchronize{
# return @hash.to_json(*options)
# }
#end
end
And now the full example to demonstrate / reproduce the error of double unlocking in Michael Sofaer's solution:
#!/usr/bin/env ruby
# ======= unchanged copy-paste part from Michael Sofaer answer (begin) =======
class LockedHash
def initialize
@hash = Hash.new
@lock = ThreadAwareLock.new()
@reader_count = 0
end
def [](key)
@lock.lock_read
ret = @hash[key]
@lock.unlock_read
ret
end
def []=(key, value)
@lock.lock_write
@hash[key] = value
@lock.unlock_write
end
def method_missing(method_sym, *arguments, &block)
if @hash.respond_to? method_sym
@lock.lock_block
val = lambda{@hash.send(method_sym,*arguments, &block)}.call
@lock.unlock_block
return val
end
super
end
end
class RWLock
def initialize
@outer = Mutex.new
@inner = Mutex.new
@reader_count = 0
end
def lock_read
@outer.synchronize{@inner.synchronize{@reader_count += 1}}
end
def unlock_read
@inner.synchronize{@reader_count -= 1}
end
def lock_write
@outer.lock
while @reader_count > 0 ;end
end
def unlock_write
@outer.unlock
end
end
class ThreadAwareLock < RWLock
def initialize
@owner = nil
super
end
def lock_block
lock_write
@owner = Thread.current.object_id
end
def unlock_block
@owner = nil
unlock_write
end
def lock_read
super unless my_block?
end
def unlock_read
super unless my_block?
end
def lock_write
super unless my_block?
end
def unlock_write
super unless my_block?
end
def my_block?
@owner == Thread.current.object_id
end
end
# ======= unchanged copy-paste part from Michael Sofaer answer (end) =======
# global hash object, which will be 'shared' across threads
$h = LockedHash.new
# hash_reader is just iterating through the 'shared' hash $h
# and prints specified delimeter (capitalized when last hash item read)
def hash_reader(delim)
loop{
count = 0
$h.each{
count += 1
if count != $h.size
$stderr.print delim
else
$stderr.puts delim.upcase
end
}
}
end
# fill hash with 10 items
10.times{|i|
$h[i] = i
}
# create a thread which will read $h hash
t1 = Thread.new(){
hash_reader("o")
}
t1.join # will never happen, but for completeness
, which gives the following error:
./LockedHash_fails_to_unlock.rb
oooooooooO
./LockedHash_fails_to_unlock.rb:55:in `unlock': Attempt to unlock a mutex which is not locked (ThreadError)
from ./LockedHash_fails_to_unlock.rb:55:in `unlock_write'
from ./LockedHash_fails_to_unlock.rb:82:in `unlock_write'
from ./LockedHash_fails_to_unlock.rb:70:in `unlock_block'
from ./LockedHash_fails_to_unlock.rb:29:in `method_missing'
from ./LockedHash_fails_to_unlock.rb:100:in `block in hash_reader'
from ./LockedHash_fails_to_unlock.rb:98:in `loop'
from ./LockedHash_fails_to_unlock.rb:98:in `hash_reader'
from ./LockedHash_fails_to_unlock.rb:119:in `block in <main>'
回答10:
Since you mention the Hash would be read heavy, having one mutex locking both read and writes would result in race conditions that are most probably won by reads. If that's ok with you, then ignore the answer.
If you want to give writes a priority, an read-write lock would help. The following code is based on some old c++ assignment for Operating Systems class, so might not be best quality, but gives a general idea.
require 'thread'
class ReadWriteLock
def initialize
@critical_section = Mutex.new
@are_writers_finished = ConditionVariable.new
@are_readers_finished = ConditionVariable.new
@readers = 0
@writers = 0
@writer_locked = false
end
def read
begin
start_read
yield
ensure
end_read
end
end
def start_read
@critical_section.lock
while (@writers != 0 || @writer_locked)
@are_writers_finished.wait(@critical_section)
end
@readers += 1
@critical_section.unlock
end
def end_read
@critical_section.lock
if (@readers -= 1) == 0
@are_readers_finished.broadcast
end
@critical_section.unlock
end
def write
begin
start_write
yield
ensure
end_write
end
end
def start_write
@critical_section.lock
@writers += 1
while @readers > 0
@are_readers_finished.wait(@critical_section)
end
while @writer_locked
@are_writers_finished.wait(@critical_section)
end
@writers -= 1
@writer_locked = true
@critical_section.unlock
end
def end_write
@critical_section.lock
@writer_locked = false
@are_writers_finished.broadcast
@critical_section.unlock
end
end
Then just wrap []= and [] in lock.write and lock.read. Might have a performance impact, but will guarantee that writes will 'get through' the reads. Usefulness of this depends on how read heavy it actually is.