Pure-Ruby concurrent Hash

2019-01-29 17:56发布

What's the best way to implement a Hash that can be modified across multiple threads, but with the smallest number of locks. For the purposes of this question, you can assume that the Hash will be read-heavy. It must be thread-safe in all Ruby implementations, including ones that operate in a truly simultaneous fashion, such as JRuby, and it must be written in pure-Ruby (no C or Java allowed).

Feel free to submit a naïve solution that always locks, but that isn't likely to be the best solution. Points for elegance, but a smaller likelihood of locking wins over smaller code.

10条回答
beautiful°
2楼-- · 2019-01-29 17:58

Since you mention the Hash would be read heavy, having one mutex locking both read and writes would result in race conditions that are most probably won by reads. If that's ok with you, then ignore the answer.

If you want to give writes a priority, an read-write lock would help. The following code is based on some old c++ assignment for Operating Systems class, so might not be best quality, but gives a general idea.

require 'thread'

class ReadWriteLock
  def initialize
    @critical_section = Mutex.new
    @are_writers_finished = ConditionVariable.new
    @are_readers_finished = ConditionVariable.new
    @readers = 0
    @writers = 0
    @writer_locked = false
  end

  def read
    begin
      start_read
      yield
    ensure
      end_read
    end
  end

  def start_read
    @critical_section.lock
    while (@writers != 0 || @writer_locked)
      @are_writers_finished.wait(@critical_section)
    end
    @readers += 1
    @critical_section.unlock
  end

  def end_read
    @critical_section.lock
    if (@readers -= 1) == 0
      @are_readers_finished.broadcast
    end
    @critical_section.unlock
  end

  def write
    begin
      start_write
      yield
    ensure
      end_write
    end
  end

  def start_write
    @critical_section.lock
    @writers += 1
    while @readers > 0
      @are_readers_finished.wait(@critical_section)
    end
    while @writer_locked
      @are_writers_finished.wait(@critical_section)
    end
    @writers -= 1
    @writer_locked = true
    @critical_section.unlock
  end

  def end_write
    @critical_section.lock
    @writer_locked = false
    @are_writers_finished.broadcast
    @critical_section.unlock
  end
end

Then just wrap []= and [] in lock.write and lock.read. Might have a performance impact, but will guarantee that writes will 'get through' the reads. Usefulness of this depends on how read heavy it actually is.

查看更多
Summer. ? 凉城
3楼-- · 2019-01-29 17:59

Unfortunately I can't add a comment to Michael Sofaer answer where he introduces: class RWLock and class LockedHash with @reader_count etc. (don't have enough karma yet)

That solution does not work. It gives an error: in `unlock': Attempt to unlock a mutex which is not locked (ThreadError)

Due to the logical bug: when it's time to unlock things then unlock happens 1 extra time (because of missing check my_block?(). Instead it unblocks it even if unblocking was not necessary "is my block") and so 2nd unlock on already unlocked mutes raises an exception. (I'll paste full code on how to reproduce this error at the end of this post).

Also Michael mentioned "the each method is unsafe (allows mutations by other threads during an iteration)" which was critical for me, so I end up with this simplified solution which works for all my use cases and it simply locks mutex on any call to any hash method when called from different thread (calls from the same thread, which owns the lock are not blocking to avoid deadlocks):

#
# This TrulyThreadSafeHash works!
#
# Note if one thread iterating the hash by #each method
# then the hash will be locked for all other threads (they will not be 
# able to even read from it)
#
class TrulyThreadSafeHash
  def initialize
    @mutex = Mutex.new
    @hash = Hash.new
  end

  def method_missing(method_sym, *arguments, &block)

    if !@mutex.owned?  # Returns true if this lock is currently held by current thread
        # We're trying to lock only if mutex is not owned by the current thread (is not locked or is locked by some other thread).
        # Following call will be blocking if mutex locked by other thread:
        @mutex.synchronize{
            return lambda{@hash.send(method_sym,*arguments, &block)}.call
        }
    end

    # We already own the lock (from current thread perspective).
    # We don't even check if @hash.respond_to?(method_sym), let's make Hash
    # respond properly on all calls (including bad calls (example: wrong method names))
    lambda{@hash.send(method_sym,*arguments, &block)}.call
  end

  # since we're tyring to mimic Hash we'll pretend to respond as Hash would
  def self.respond_to?(method_sym, include_private = false)
    Hash.respond_to(method_sym, include_private)
  end

  # override Object's to_s because our method_missing won't be called for to_s
  def to_s(*arguments)
      @mutex.synchronize{
        return @hash.to_s
      }
  end

  # And for those, who want to run extra mile:
  # to make our class json-friendly we shoud require 'json' and uncomment this:
  #def to_json(*options)
  #    @mutex.synchronize{
  #        return @hash.to_json(*options)
  #    }
  #end

end

And now the full example to demonstrate / reproduce the error of double unlocking in Michael Sofaer's solution:

#!/usr/bin/env ruby

# ======= unchanged copy-paste part from Michael Sofaer answer (begin) =======

class LockedHash
  def initialize
    @hash = Hash.new
    @lock = ThreadAwareLock.new()
    @reader_count = 0
  end

  def [](key)
    @lock.lock_read
    ret = @hash[key]
    @lock.unlock_read
    ret
  end

  def []=(key, value)
    @lock.lock_write
    @hash[key] = value
    @lock.unlock_write
  end

  def method_missing(method_sym, *arguments, &block)
    if @hash.respond_to? method_sym
      @lock.lock_block
      val = lambda{@hash.send(method_sym,*arguments, &block)}.call
      @lock.unlock_block
      return val
    end
    super
  end
end



class RWLock
  def initialize
    @outer = Mutex.new
    @inner = Mutex.new
    @reader_count = 0
  end
  def lock_read
    @outer.synchronize{@inner.synchronize{@reader_count += 1}}
  end
  def unlock_read
    @inner.synchronize{@reader_count -= 1}
  end
  def lock_write
    @outer.lock
    while @reader_count > 0 ;end
  end
  def unlock_write
    @outer.unlock
  end
end

class ThreadAwareLock < RWLock
  def initialize
    @owner = nil
    super
  end
  def lock_block
    lock_write
    @owner = Thread.current.object_id
  end
  def unlock_block
    @owner = nil
    unlock_write
  end
  def lock_read
    super unless my_block?
  end
  def unlock_read
    super unless my_block?
  end
  def lock_write
    super unless my_block?
  end
  def unlock_write
    super unless my_block?
  end
  def my_block?
    @owner == Thread.current.object_id
  end
end

# ======= unchanged copy-paste part from Michael Sofaer answer (end) =======


# global hash object, which will be 'shared' across threads
$h = LockedHash.new

# hash_reader is just iterating through the 'shared' hash $h
# and prints specified delimeter (capitalized when last hash item read)
def hash_reader(delim)
    loop{
        count = 0
        $h.each{
            count += 1
            if count != $h.size
                $stderr.print delim
            else
                $stderr.puts delim.upcase
            end
        }
    }
end

# fill hash with 10 items
10.times{|i|
    $h[i] = i
}

# create a thread which will read $h hash
t1 = Thread.new(){
    hash_reader("o")
}

t1.join  # will never happen, but for completeness

, which gives the following error:

./LockedHash_fails_to_unlock.rb
oooooooooO
./LockedHash_fails_to_unlock.rb:55:in `unlock': Attempt to unlock a mutex which is not locked (ThreadError)
        from ./LockedHash_fails_to_unlock.rb:55:in `unlock_write'
        from ./LockedHash_fails_to_unlock.rb:82:in `unlock_write'
        from ./LockedHash_fails_to_unlock.rb:70:in `unlock_block'
        from ./LockedHash_fails_to_unlock.rb:29:in `method_missing'
        from ./LockedHash_fails_to_unlock.rb:100:in `block in hash_reader'
        from ./LockedHash_fails_to_unlock.rb:98:in `loop'
        from ./LockedHash_fails_to_unlock.rb:98:in `hash_reader'
        from ./LockedHash_fails_to_unlock.rb:119:in `block in <main>'
查看更多
干净又极端
4楼-- · 2019-01-29 18:00

Okay, now that you specified the actually meaning of 'threadsafe', here are two potential implementations. The following code will run forever in MRI and JRuby. The lockless implementation follows an eventual consistency model where each thread uses it's own view of the hash if the master is in flux. There is a little trickery required to make sure storing all the information in the thread doesn't leak memory, but that is handled and tested ― process size does not grow running this code. Both implementations would need more work to be 'complete', meaning delete, update, etc. would need some thinking, but either of the two concepts below will meet your requirements.

It's very important for people reading this thread to realize the whole issue is exclusive to JRuby ― in MRI the built-in Hash is sufficient.

module Cash
  def Cash.new(*args, &block)
    env = ENV['CASH_IMPL']
    impl = env ? Cash.const_get(env) : LocklessImpl
    klass = defined?(JRUBY_VERSION) ? impl : ::Hash
    klass.new(*args)
  end

  class LocklessImpl
    def initialize
      @hash = {}
    end

    def thread_hash
      thread = Thread.current
      thread[:cash] ||= {}
      hash = thread[:cash][thread_key]
      if hash
        hash
      else
        hash = thread[:cash][thread_key] = {}
        ObjectSpace.define_finalizer(self){ thread[:cash].delete(thread_key) }
        hash
      end
    end

    def thread_key
      [Thread.current.object_id, object_id]
    end

    def []=(key, val)
      time = Time.now.to_f
      tuple = [time, val]
      @hash[key] = tuple
      thread_hash[key] = tuple
      val
    end

    def [](key)
    # check the master value
    #
      val = @hash[key]

    # someone else is either writing the key or it has never been set.  we
    # need to invalidate our own copy in either case
    #
      if val.nil?
        thread_val = thread_hash.delete(key)
        return(thread_val ? thread_val.last : nil)
      end

    # check our own thread local value
    #
      thread_val = thread_hash[key]

    # in this case someone else has written a value that we have never seen so
    # simply return it
    #
      if thread_val.nil?
        return(val.last)
      end

    # in this case there is a master *and* a thread local value, if the master
    # is newer juke our own cached copy
    #
      if val.first > thread_val.first
        thread_hash.delete(key)
        return val.last
      else
        return thread_val.last
      end
    end
  end

  class LockingImpl < ::Hash
    require 'sync'

    def initialize(*args, &block)
      super
    ensure
      extend Sync_m
    end

    def sync(*args, &block)
      sync_synchronize(*args, &block)
    end

    def [](key)
      sync(:SH){ super }
    end

    def []=(key, val)
      sync(:EX){ super }
    end
  end
end



if $0 == __FILE__
  iteration = 0

  loop do
    n = 42
    hash = Cash.new

    threads =
      Array.new(10) {
        Thread.new do
          Thread.current.abort_on_exception = true
          n.times do |key|
            hash[key] = key
            raise "#{ key }=nil" if hash[key].nil?
          end
        end
      }

    threads.map{|thread| thread.join}

    puts "THREADSAFE: #{ iteration += 1 }"
  end
end
查看更多
祖国的老花朵
5楼-- · 2019-01-29 18:04

Posting base/naive solution, just to boost my Stack Overflow cred:

require 'thread'

class ConcurrentHash < Hash
  def initialize
    super
    @mutex = Mutex.new
  end

  def [](*args)
    @mutex.synchronize { super }
  end

  def []=(*args)
    @mutex.synchronize { super }
  end
end
查看更多
Rolldiameter
6楼-- · 2019-01-29 18:09

i'm pretty unclear on what is meant by this. i think the simplest implementation is simply

Hash

that is to say the built-in ruby hash is threadsafe if by threadsafe you mean will not blow up if > 1 threads tries to access it. this code will run safely forever

n = 4242
hash = {}

loop do
  a =
    Thread.new do
      n.times do
        hash[:key] = :val
      end
    end

  b =
    Thread.new do
      n.times do
        hash.delete(:key)
      end
    end

  c =
    Thread.new do
      n.times do
        val = hash[:key]
        raise val.inspect unless [nil, :val].include?(val)
      end
    end

  a.join
  b.join
  c.join
  p :THREADSAFE
end

i suspect by thread safe you really mean ACID - for instance a write like hash[:key]=:val followed by a read if has[:key] would return :val. but no amount of trickery with locking can provide that - the last in would always win. for example, say you have 42 thread all updating a threadsafe hash - which value should be read by the 43'rd?? surely by threasafe you don't mean some sort of total ordering on writes - therefore if 42 threads were actively writing the 'correct' value is any right? but ruby's built-in Hash works in just this way...

perhaps you mean something like

hash.each do ...

in one thread and

hash.delete(key)

would not interfere with one another? i can imagine wanting that to be threadsafe, but that's not even safe in a single thread with the MRI ruby (obviously you cannot modify a hash while iterating over it)

so can you be more specific about what you mean by 'threadsafe' ??

the only way to give ACID semantics would be a gross lock (sure this could be a method that took a block - but still an external lock).

ruby's thread scheduler isn't just going to schedule a thread smack in the middle of some arbitrary c function (like the built-in hash aref aset methods) so those are effectively threadsafe.

查看更多
虎瘦雄心在
7楼-- · 2019-01-29 18:10

This might be a use case for the hamster gem

Hamster implements Hash Array Mapped Tries (HAMT), as well as some other persistent data structures, in pure Ruby.

Persistent data structures are immutable, and instead of mutating (changing) the structure, such as by adding or replacing a key-value pair in a Hash, you instead return a new data structure which contains the change. The trick, with the persistent immutable data structures, is that the newly returned data structure re-uses as much of the predecessor as possible.

I think to implement using hamster, you would use their mutable hash wrapper, which passes all reads to the current value of the persistent immutable hash (ie, should be fast), while guarding all writes with a mutex, and swapping to the new value of the persistent immutable hash after the write.

For example:

require 'hamster'
require 'hamster/experimental/mutable_hash'    
hsh = Hamster.mutable_hash(:name => "Simon", :gender => :male)

# reading goes directly to hash
puts hsh[:name] # Simon

# writing is actually swapping to new value of underlying persistent data structure
hsh.put(:name, "Joe")
puts hsh[:name] # Joe

So, let's use this for a similar type of problem to the one described:

(gist here)

require 'hamster'
require 'hamster/experimental/mutable_hash'

# a bunch of threads with a read/write ratio of 10:1
num_threads = 100
num_reads_per_write = 10
num_loops = 100 
hsh = Hamster.mutable_hash

puts RUBY_DESCRIPTION
puts "#{num_threads} threads x #{num_loops} loops, #{num_reads_per_write}:1 R/W ratio"

t0 = Time.now
Thread.abort_on_exception = true
threads = (0...num_threads).map do |n|
  Thread.new do
    write_key = n % num_reads_per_write
    read_keys = (0...num_reads_per_write).to_a.shuffle # random order
    last_read = nil

    num_loops.times do
      read_keys.each do |k|
        # Reads
        last_read = hsh[k]

        Thread.pass

        # Atomic increments in the correct ratio to reads
        hsh.put(k) { |v| (v || 0) + 1 } if k == write_key
      end
    end
  end
end

threads.map { |t| t.join }
t1 = Time.now

puts "Error in keys" unless (0...num_reads_per_write).to_a == hsh.keys.sort.to_a
puts "Error in values" unless hsh.values.all? { |v| v == (num_loops * num_threads) / num_reads_per_write }
puts "Time elapsed: #{t1 - t0} s"

I'm getting the following outputs:

ruby 1.9.2p320 (2012-04-20 revision 35421) [x86_64-linux]
100 threads x 100 loops, 10:1 R/W ratio
Time elapsed: 5.763414627 s

jruby 1.7.0 (1.9.3p203) 2012-10-22 ff1ebbe on Java HotSpot(TM) 64-Bit Server VM 1.6.0_26-b03 [linux-amd64]
100 threads x 100 loops, 10:1 R/W ratio
Time elapsed: 1.697 s

What do you think of this?

This solution is more similar to how one might solve this in Scala or Clojure, although in those languages one would more likely be using software transactional memory with low-level CPU support for the atomic compare and swap operations which are implemented.

Edit: It's worth noting that one reason the hamster implementation is fast is that it features a lock-free read path. Please reply in comments if you have questions about that or how it works.

查看更多
登录 后发表回答