纯Ruby并发哈希(Pure-Ruby concurrent Hash)

2019-06-24 22:57发布

什么是实现能够在多个线程进行修改散列的最好方式,但随着锁的最小数量。 对于这个问题的目的,你可以假设哈希将被读取重。 它必须是线程安全的在所有的Ruby实现,包括在一个真正的同时方式运作的,如JRuby和它必须在纯Ruby编写(没有C或Java允许)。

随意提交一个天真的解决方案,始终锁定,但是这不可能是最好的解决方案。 分优雅,但在较小的代码锁定胜利的可能性较小。

Answer 1:

好了,现在你指定的“线程”的实际意义,这里有两种可能的实现。 下面的代码将在MRI和JRuby永远运行。 该无锁定的实施遵循其中每个线程使用它自己的散列的观点,如果主人是在不断变化的最终一致性模型。 还有就是要确保存储在线程的所有信息不会泄露内存要求有点挂羊头卖狗肉,但被处理和测试 - 进程大小不长运行此代码。 这两种实现方式需要更多的工作是“完整的”,意思是删除,更新等,都需要一些思考,但无论是两个概念下面将满足您的要求。

在MRI内置散列是不够的 - 人们阅读此线程来实现整个问题是独家的JRuby这是非常重要的。

module Cash
  def Cash.new(*args, &block)
    env = ENV['CASH_IMPL']
    impl = env ? Cash.const_get(env) : LocklessImpl
    klass = defined?(JRUBY_VERSION) ? impl : ::Hash
    klass.new(*args)
  end

  class LocklessImpl
    def initialize
      @hash = {}
    end

    def thread_hash
      thread = Thread.current
      thread[:cash] ||= {}
      hash = thread[:cash][thread_key]
      if hash
        hash
      else
        hash = thread[:cash][thread_key] = {}
        ObjectSpace.define_finalizer(self){ thread[:cash].delete(thread_key) }
        hash
      end
    end

    def thread_key
      [Thread.current.object_id, object_id]
    end

    def []=(key, val)
      time = Time.now.to_f
      tuple = [time, val]
      @hash[key] = tuple
      thread_hash[key] = tuple
      val
    end

    def [](key)
    # check the master value
    #
      val = @hash[key]

    # someone else is either writing the key or it has never been set.  we
    # need to invalidate our own copy in either case
    #
      if val.nil?
        thread_val = thread_hash.delete(key)
        return(thread_val ? thread_val.last : nil)
      end

    # check our own thread local value
    #
      thread_val = thread_hash[key]

    # in this case someone else has written a value that we have never seen so
    # simply return it
    #
      if thread_val.nil?
        return(val.last)
      end

    # in this case there is a master *and* a thread local value, if the master
    # is newer juke our own cached copy
    #
      if val.first > thread_val.first
        thread_hash.delete(key)
        return val.last
      else
        return thread_val.last
      end
    end
  end

  class LockingImpl < ::Hash
    require 'sync'

    def initialize(*args, &block)
      super
    ensure
      extend Sync_m
    end

    def sync(*args, &block)
      sync_synchronize(*args, &block)
    end

    def [](key)
      sync(:SH){ super }
    end

    def []=(key, val)
      sync(:EX){ super }
    end
  end
end



if $0 == __FILE__
  iteration = 0

  loop do
    n = 42
    hash = Cash.new

    threads =
      Array.new(10) {
        Thread.new do
          Thread.current.abort_on_exception = true
          n.times do |key|
            hash[key] = key
            raise "#{ key }=nil" if hash[key].nil?
          end
        end
      }

    threads.map{|thread| thread.join}

    puts "THREADSAFE: #{ iteration += 1 }"
  end
end


Answer 2:

发帖基地/天真的解决方案,只是为了提高我的堆栈溢出CRED:

require 'thread'

class ConcurrentHash < Hash
  def initialize
    super
    @mutex = Mutex.new
  end

  def [](*args)
    @mutex.synchronize { super }
  end

  def []=(*args)
    @mutex.synchronize { super }
  end
end


Answer 3:

耶胡达,我想你提到的伊娃设置是原子? 怎么样简单的复制和交换呢?

require 'thread'

class ConcurrentHash
  def initialize
    @reader, @writer = {}, {}
    @lock = Mutex.new
  end

  def [](key)
    @reader[key]
  end

  def []=(key, value)
    @lock.synchronize {
      @writer[key] = value
      @reader, @writer = @writer, @reader
      @writer[key] = value
    }
  end
end


Answer 4:

这是围绕哈希的包装类,允许并发读取,但锁定下来的东西对于所有其他类型的访问(包括迭代读取)。

class LockedHash
  def initialize
    @hash = Hash.new
    @lock = ThreadAwareLock.new()
    @reader_count = 0
  end

  def [](key)
    @lock.lock_read
    ret = @hash[key]
    @lock.unlock_read
    ret
  end

  def []=(key, value)
    @lock.lock_write
    @hash[key] = value
    @lock.unlock_write
  end

  def method_missing(method_sym, *arguments, &block)
    if @hash.respond_to? method_sym
      @lock.lock_block
      val = lambda{@hash.send(method_sym,*arguments, &block)}.call
      @lock.unlock_block
      return val
    end
    super
  end
end

下面是它采用了锁定代码:

class RWLock
  def initialize
    @outer = Mutex.new
    @inner = Mutex.new
    @reader_count = 0
  end
  def lock_read
    @outer.synchronize{@inner.synchronize{@reader_count += 1}}
  end
  def unlock_read
    @inner.synchronize{@reader_count -= 1}
  end
  def lock_write
    @outer.lock
    while @reader_count > 0 ;end
  end
  def unlock_write
    @outer.unlock
  end
end

class ThreadAwareLock < RWLock
  def initialize
    @owner = nil
    super
  end
  def lock_block
    lock_write
    @owner = Thread.current.object_id
  end
  def unlock_block
    @owner = nil
    unlock_write
  end
  def lock_read
    super unless my_block?
  end
  def unlock_read
    super unless my_block?
  end
  def lock_write
    super unless my_block?
  end
  def unlock_write
    super unless my_block?
  end
  def my_block?
    @owner == Thread.current.object_id
  end
end

该线程感知锁,让您一次锁定类,然后调用,通常会锁定方式,并让他们无法锁定。 你需要这个,因为你得为一些方法里面块,这些块可以调用该对象的锁定方法,以及你不想死锁或双锁错误。 你可以使用一个计数的锁,而不是这个。

下面是实现斗级读写锁的尝试:

class SafeBucket
  def initialize
    @lock = RWLock.new()
    @value_pairs = []
  end

  def get(key)
    @lock.lock_read
    pair = @value_pairs.select{|p| p[0] == key}
    unless pair && pair.size > 0
      @lock.unlock_read
      return nil
    end
    ret = pair[0][1]
    @lock.unlock_read
    ret
  end

  def set(key, value)
    @lock.lock_write
    pair = @value_pairs.select{|p| p[0] == key}
    if pair && pair.size > 0
      pair[0][1] = value
      @lock.unlock_write
      return
    end
    @value_pairs.push [key, value]
    @lock.unlock_write
    value
  end

  def each
    @value_pairs.each{|p| yield p[0],p[1]}
  end

end

class MikeConcurrentHash
  def initialize
    @buckets = []
    100.times {@buckets.push SafeBucket.new}
  end

  def [](key)
    bucket(key).get(key)
  end

  def []=(key, value)
    bucket(key).set(key, value)
  end

  def each
    @buckets.each{|b| b.each{|key, value| yield key, value}}
  end

  def bucket(key)
    @buckets[key.hash % 100]
  end
end

我停在这,因为它是太慢了工作,所以每个方法是不安全的(允许其他线程突变的迭代过程中),它不支持最哈希方法。

而且这里有一个测试工具为并行哈希:

require 'thread'
class HashHarness
  Keys = [:a, :basic, :test, :harness, :for, :concurrent, :testing, :of, :hashes,
          :that, :tries, :to, :provide, :a, :framework, :for, :designing, :a, :good, :ConcurrentHash,
          :for, :all, :ruby, :implementations]

  def self.go
    h = new
    r = h.writiness_range(20, 10000, 0, 0)
    r.each{|k, v| p k + ' ' + v.map{|p| p[1]}.join(' ')}
    return
  end
  def initialize(classes = [MikeConcurrentHash, JoshConcurrentHash, JoshConcurrentHash2, PaulConcurrentHash, LockedHash, Hash])
    @classes = classes
  end
  def writiness_range(basic_threads, ops, each_threads, loops)
    result = {}
    @classes.each do |hash_class|
      res = []
      0.upto 10 do |i|
        writiness = i.to_f / 10
        res.push [writiness,test_one(hash_class, basic_threads, ops, each_threads, loops, writiness)]
      end
      result[hash_class.name] = res
    end
    result
  end
  def test_one(hash_class, basic_threads, ops, each_threads, loops, writiness)
    time = Time.now
    threads = []
    hash = hash_class.new
    populate_hash(hash)
    begin
    basic_threads.times do
      threads.push Thread.new{run_basic_test(hash, writiness, ops)}
    end
    each_threads.times do
      threads.push Thread.new{run_each_test(hash, writiness, loops)}
    end
    threads.each{|t| t.join}
    rescue ThreadError => e
      p [e.message, hash_class.name, basic_threads, ops, each_threads, loops, writiness].join(' ')
      return -1
    end
    p [hash_class.name, basic_threads, ops, each_threads, loops, writiness, Time.now - time].join(' ')
    return Time.now - time
  end
  def run_basic_test(hash, writiness, ops)
    ops.times do
      rand < writiness ? hash[choose_key]= rand : hash[choose_key]
    end
  end
  def run_each_test(hash, writiness, loops)
    loops.times do
      hash.each do |k, v|
        if rand < writiness
          each_write_work(hash, k, v)
        else
          each_read_work(k, v)
        end
      end
    end
  end
  def each_write_work(hash, key, value)
    hash[key] = rand
  end
  def each_read_work(key, value)
    key.to_s + ": " + value.to_s
  end
  def choose_key
    Keys[rand(Keys.size)]
  end
  def populate_hash(hash)
    Keys.each{|key| hash[key]=rand}  
  end
end

编号:JRUBY

Writiness      0.0   0.1   0.2   0.3   0.4   0.5   0.6   0.7   0.8   0.9   1.0
ConcurrentHash 2.098 3.179 2.971 3.083 2.731 2.941 2.564 2.480 2.369 1.862 1.881
LockedHash     1.873 1.896 2.085 2.058 2.001 2.055 1.904 1.921 1.873 1.841 1.630
Hash           0.530 0.672 0.685 0.822 0.719 0.877 0.901 0.931 0.942 0.950 1.001

和MRI

Writiness      0.0    0.1    0.2    0.3    0.4    0.5    0.6    0.7    0.8    0.9    1.0
ConcurrentHash  9.214  9.913  9.064 10.112 10.240 10.574 10.566 11.027 11.323 11.837 13.036
LockedHash     19.593 17.712 16.998 17.045 16.687 16.609 16.647 15.307 14.464 13.931 14.146
Hash            0.535  0.537  0.534  0.599  0.594  0.676  0.635  0.650  0.654  0.661  0.692

MRI数字是相当惊人的。 在MRI锁定真的很烂。



Answer 5:

这可能是一个用例的仓鼠宝石

仓鼠实现散列阵列映射尝试次数(HAMT) ,以及一些其他持久性数据结构 ,在纯Ruby。

持久数据结构是不可改变的,并且代替突变(变化)的结构中,例如通过添加或在哈希替换键 - 值对,则代替返回一个新的数据结构,其中包含的变化。 这一招,与持久不变的数据结构,是新返回的数据结构重新使用尽可能多的前任可能的。

我想用仓鼠,你会用自己的可变散包装,其通过所有读取到持久不变的哈希值的电流值(即,要快)来实现,而守着所有互斥写入和交换为新的值的写操作之后的持久不变的哈希值。

例如:

require 'hamster'
require 'hamster/experimental/mutable_hash'    
hsh = Hamster.mutable_hash(:name => "Simon", :gender => :male)

# reading goes directly to hash
puts hsh[:name] # Simon

# writing is actually swapping to new value of underlying persistent data structure
hsh.put(:name, "Joe")
puts hsh[:name] # Joe

所以,让我们用这个来描述的类型相似的问题:

( 要点这里 )

require 'hamster'
require 'hamster/experimental/mutable_hash'

# a bunch of threads with a read/write ratio of 10:1
num_threads = 100
num_reads_per_write = 10
num_loops = 100 
hsh = Hamster.mutable_hash

puts RUBY_DESCRIPTION
puts "#{num_threads} threads x #{num_loops} loops, #{num_reads_per_write}:1 R/W ratio"

t0 = Time.now
Thread.abort_on_exception = true
threads = (0...num_threads).map do |n|
  Thread.new do
    write_key = n % num_reads_per_write
    read_keys = (0...num_reads_per_write).to_a.shuffle # random order
    last_read = nil

    num_loops.times do
      read_keys.each do |k|
        # Reads
        last_read = hsh[k]

        Thread.pass

        # Atomic increments in the correct ratio to reads
        hsh.put(k) { |v| (v || 0) + 1 } if k == write_key
      end
    end
  end
end

threads.map { |t| t.join }
t1 = Time.now

puts "Error in keys" unless (0...num_reads_per_write).to_a == hsh.keys.sort.to_a
puts "Error in values" unless hsh.values.all? { |v| v == (num_loops * num_threads) / num_reads_per_write }
puts "Time elapsed: #{t1 - t0} s"

我得到了以下成果:

ruby 1.9.2p320 (2012-04-20 revision 35421) [x86_64-linux]
100 threads x 100 loops, 10:1 R/W ratio
Time elapsed: 5.763414627 s

jruby 1.7.0 (1.9.3p203) 2012-10-22 ff1ebbe on Java HotSpot(TM) 64-Bit Server VM 1.6.0_26-b03 [linux-amd64]
100 threads x 100 loops, 10:1 R/W ratio
Time elapsed: 1.697 s

你觉得这怎么样?

该解决方案更类似于一个会如何解决这个斯卡拉或Clojure的,虽然这些语言中的一种更可能是利用软件事务内存的低级别的CPU支持,这是实现原子比较和交换操作。

编辑 :这是值得注意的一个原因仓鼠实现较快的是,它的特点是无锁的读取路径 。 请评论进行回复,如果你有什么,否则它是如何工作的问题。



Answer 6:

这个( 视频 , PDF格式 )大约是用Java实现无锁的哈希表。

扰流板:使用原子比较并交换(CAS)操作,如果无法在Ruby中,你可以用锁效仿他们。 不知道这会给任何优势简单的锁把守哈希表



Answer 7:

没测试过,并在优化天真刺读取。 它假定大部分的时间,该值将不会被锁定。 如果是,紧循环会尝试,直到它。 我把Thread.critical在那里帮助确保读取线程不会运行,直到写操作完成。 不知道是否需要的重要组成部分,它实际上取决于如何读重你的意思,所以一些基准测试是为了。

class ConcurrentHash < Hash

  def initialize(*args)
    @semaphore = Mutex.new
    super
  end

  def []=(k,v)
    begin
      old_crit = Thread.critical
      Thread.critical = true unless old_crit
      @semaphore.synchronize { super }
    ensure
      Thread.critical = old_crit
    end
  end

  def [](k)
    while(true)
      return super unless @semaphore.locked?
    end
  end

end

有可能是需要检查@semaphore锁定一些其他的读取方法,我不知道,如果一切在#[]项中实现。



Answer 8:

我是对什么是这个意思不太清楚。 我认为最简单的实现很简单

Hash

也就是说内置红宝石哈希线程安全的,如果由线程你的意思是不会炸毁如果> 1个线程试图访问它。 此代码将安全运行永远

n = 4242
hash = {}

loop do
  a =
    Thread.new do
      n.times do
        hash[:key] = :val
      end
    end

  b =
    Thread.new do
      n.times do
        hash.delete(:key)
      end
    end

  c =
    Thread.new do
      n.times do
        val = hash[:key]
        raise val.inspect unless [nil, :val].include?(val)
      end
    end

  a.join
  b.join
  c.join
  p :THREADSAFE
end

我通过线程安全的怀疑你真的是酸 - 例如像哈希写[:键] =:VAL接着读,如果有[:键]会返回:VAL。 但锁定没有弄虚作假的数量可以提供 - 最后在总是赢。 例如,假设你有42个线程中的所有更新线程哈希 - 这价值应该由43'rd读? 当然通过threasafe你不是说在写某种整体排序的-因此,如果42个线程正在积极编写的“正确”值权利? 但Ruby的内置散列就是以这种方式...

也许你的意思是这样

hash.each do ...

在一个线程中和

hash.delete(key)

不会互相干扰? 我可以想像想这是线程安全的,但是这不是连安全与MRI红宝石(显然在遍历它,你不能修改散列) 单个线程

所以你能具体谈谈你的意思是“线程”是什么?

给ACID语义的唯一方法是粗暴锁(相信这可能是拿了块的方法 - 但还是外部锁)。

Ruby的线程调度不只是要在一些任意的C函数的中间嫌调度线程(如内置散列AREF ASET方法),所以这些都是有效的线程安全的。



Answer 9:

不幸的是,他介绍我不能添加到迈克尔索费尔答案评论:rwlock的类和类LockedHash与@reader_count等(没有足够的人缘还)

该解决方案是行不通的。 它给出了一个错误:在`解锁':尝试解除未锁定的互斥(ThreadError)

由于逻辑错误:当它的时间来解锁的事情再解开发生1周额外的时间(?因为缺少检查my_block(的),相反,它放开它,即使疏通是没有必要的“是我的块”),并就这么第二解锁已解锁静音引发一个例外。 (我会贴在如何在这篇文章的末尾重现此错误完整的代码)。

此外迈克尔提到的“每一个方法是不安全的(允许一个迭代过程中被其他线程突变)”,这对我来说是至关重要的,所以我结束了它的工作原理为我所有的使用情况下,这种简化的解决方案,它只是锁定到任何呼叫互斥来自不同线程调用时任何散列法(从同一个线程,该公司拥有锁呼叫不阻挡,以避免死锁):

#
# This TrulyThreadSafeHash works!
#
# Note if one thread iterating the hash by #each method
# then the hash will be locked for all other threads (they will not be 
# able to even read from it)
#
class TrulyThreadSafeHash
  def initialize
    @mutex = Mutex.new
    @hash = Hash.new
  end

  def method_missing(method_sym, *arguments, &block)

    if !@mutex.owned?  # Returns true if this lock is currently held by current thread
        # We're trying to lock only if mutex is not owned by the current thread (is not locked or is locked by some other thread).
        # Following call will be blocking if mutex locked by other thread:
        @mutex.synchronize{
            return lambda{@hash.send(method_sym,*arguments, &block)}.call
        }
    end

    # We already own the lock (from current thread perspective).
    # We don't even check if @hash.respond_to?(method_sym), let's make Hash
    # respond properly on all calls (including bad calls (example: wrong method names))
    lambda{@hash.send(method_sym,*arguments, &block)}.call
  end

  # since we're tyring to mimic Hash we'll pretend to respond as Hash would
  def self.respond_to?(method_sym, include_private = false)
    Hash.respond_to(method_sym, include_private)
  end

  # override Object's to_s because our method_missing won't be called for to_s
  def to_s(*arguments)
      @mutex.synchronize{
        return @hash.to_s
      }
  end

  # And for those, who want to run extra mile:
  # to make our class json-friendly we shoud require 'json' and uncomment this:
  #def to_json(*options)
  #    @mutex.synchronize{
  #        return @hash.to_json(*options)
  #    }
  #end

end

而现在的完整的例子来演示/再现迈克尔索费尔的解决方案双重解锁的错误:

#!/usr/bin/env ruby

# ======= unchanged copy-paste part from Michael Sofaer answer (begin) =======

class LockedHash
  def initialize
    @hash = Hash.new
    @lock = ThreadAwareLock.new()
    @reader_count = 0
  end

  def [](key)
    @lock.lock_read
    ret = @hash[key]
    @lock.unlock_read
    ret
  end

  def []=(key, value)
    @lock.lock_write
    @hash[key] = value
    @lock.unlock_write
  end

  def method_missing(method_sym, *arguments, &block)
    if @hash.respond_to? method_sym
      @lock.lock_block
      val = lambda{@hash.send(method_sym,*arguments, &block)}.call
      @lock.unlock_block
      return val
    end
    super
  end
end



class RWLock
  def initialize
    @outer = Mutex.new
    @inner = Mutex.new
    @reader_count = 0
  end
  def lock_read
    @outer.synchronize{@inner.synchronize{@reader_count += 1}}
  end
  def unlock_read
    @inner.synchronize{@reader_count -= 1}
  end
  def lock_write
    @outer.lock
    while @reader_count > 0 ;end
  end
  def unlock_write
    @outer.unlock
  end
end

class ThreadAwareLock < RWLock
  def initialize
    @owner = nil
    super
  end
  def lock_block
    lock_write
    @owner = Thread.current.object_id
  end
  def unlock_block
    @owner = nil
    unlock_write
  end
  def lock_read
    super unless my_block?
  end
  def unlock_read
    super unless my_block?
  end
  def lock_write
    super unless my_block?
  end
  def unlock_write
    super unless my_block?
  end
  def my_block?
    @owner == Thread.current.object_id
  end
end

# ======= unchanged copy-paste part from Michael Sofaer answer (end) =======


# global hash object, which will be 'shared' across threads
$h = LockedHash.new

# hash_reader is just iterating through the 'shared' hash $h
# and prints specified delimeter (capitalized when last hash item read)
def hash_reader(delim)
    loop{
        count = 0
        $h.each{
            count += 1
            if count != $h.size
                $stderr.print delim
            else
                $stderr.puts delim.upcase
            end
        }
    }
end

# fill hash with 10 items
10.times{|i|
    $h[i] = i
}

# create a thread which will read $h hash
t1 = Thread.new(){
    hash_reader("o")
}

t1.join  # will never happen, but for completeness

,它提供了以下错误:

./LockedHash_fails_to_unlock.rb
oooooooooO
./LockedHash_fails_to_unlock.rb:55:in `unlock': Attempt to unlock a mutex which is not locked (ThreadError)
        from ./LockedHash_fails_to_unlock.rb:55:in `unlock_write'
        from ./LockedHash_fails_to_unlock.rb:82:in `unlock_write'
        from ./LockedHash_fails_to_unlock.rb:70:in `unlock_block'
        from ./LockedHash_fails_to_unlock.rb:29:in `method_missing'
        from ./LockedHash_fails_to_unlock.rb:100:in `block in hash_reader'
        from ./LockedHash_fails_to_unlock.rb:98:in `loop'
        from ./LockedHash_fails_to_unlock.rb:98:in `hash_reader'
        from ./LockedHash_fails_to_unlock.rb:119:in `block in <main>'


Answer 10:

既然你提到的散列要读重,有一个互斥对象锁定读取和写入会导致由读取最有可能赢得比赛的条件。 如果这是确定和你在一起,那么忽略了答案。

如果你想给写一个优先的读写锁会有所帮助。 下面的代码是基于操作系统类的一些旧的C ++分配,所以可能不是最好的质量,而是给出了一个大致的了解。

require 'thread'

class ReadWriteLock
  def initialize
    @critical_section = Mutex.new
    @are_writers_finished = ConditionVariable.new
    @are_readers_finished = ConditionVariable.new
    @readers = 0
    @writers = 0
    @writer_locked = false
  end

  def read
    begin
      start_read
      yield
    ensure
      end_read
    end
  end

  def start_read
    @critical_section.lock
    while (@writers != 0 || @writer_locked)
      @are_writers_finished.wait(@critical_section)
    end
    @readers += 1
    @critical_section.unlock
  end

  def end_read
    @critical_section.lock
    if (@readers -= 1) == 0
      @are_readers_finished.broadcast
    end
    @critical_section.unlock
  end

  def write
    begin
      start_write
      yield
    ensure
      end_write
    end
  end

  def start_write
    @critical_section.lock
    @writers += 1
    while @readers > 0
      @are_readers_finished.wait(@critical_section)
    end
    while @writer_locked
      @are_writers_finished.wait(@critical_section)
    end
    @writers -= 1
    @writer_locked = true
    @critical_section.unlock
  end

  def end_write
    @critical_section.lock
    @writer_locked = false
    @are_writers_finished.broadcast
    @critical_section.unlock
  end
end

然后,只需换[]中lock.write和lock.read =和[]。 可能会影响性能,但会保证写将“打通”的读取。 这种有效性取决于它实际上是如何读重的。



文章来源: Pure-Ruby concurrent Hash