Reading a file N lines at a time in ruby

2019-04-17 18:11发布

问题:

I have a large file (hundreds of megs) that consists of filenames, one per line.

I need to loop through the list of filenames, and fork off a process for each filename. I want a maximum of 8 forked processes at a time and I don't want to read the whole filename list into RAM at once.

I'm not even sure where to begin, can anyone help me out?

回答1:

File.foreach("large_file").each_slice(8) do |eight_lines|
  # eight_lines is an array containing 8 lines.
  # at this point you can iterate over these filenames
  # and spawn off your processes/threads
end


回答2:

It sounds like the Process module will be useful for this task. Here's something I quickly threw together as a starting point:

include Process

i = 0
for line in open('files.txt') do
    i += 1
    fork { `sleep #{rand} && echo "#{i} - #{line.chomp}" >> numbers.txt` }

    if i >= 8
        wait # join any single child process
        i -= 1
    end
end

waitall # join all remaining child processes

Output:

hello
goodbye

test1
test2
a
b
c
d
e
f
g
$ ruby b.rb
$ cat numbers.txt 
1 - hello
3 - 
2 - goodbye
5 - test2
6 - a
4 - test1
7 - b
8 - c
8 - d
8 - e
8 - f
8 - g

The way this works is that:

  • for line in open(XXX) will lazily iterate over the lines of the file you specify.
  • fork will spawn a child process executing the given block, and in this case, we use backticks to indicate something to be executed by the shell. Note that rand returns a value 0-1 here so we are sleeping less than a second, and I call line.chomp to remove the trailing newline that we get from line.
  • If we've accumulated 8 or more processes, call wait to stop everything until one of them returns.
  • Finally, outside the loop, call waitall to join all remaining processes before exiting the script.


回答3:

Here's Mark's solution wrapped up as a ProcessPool class, might be helpful to have it around (and please correct me if I made some mistake):

class ProcessPool
  def initialize pool_size
    @pool_size = pool_size
    @free_slots = @pool_size
  end

  def fork &p
    if @free_slots == 0
      Process.wait
      @free_slots += 1
    end
    @free_slots -= 1
    puts "Free slots: #{@free_slots}"
    Process.fork &p
  end

  def waitall
    Process.waitall
  end
end

pool = ProcessPool.new 8
for line in open('files.txt') do
  pool.fork { Kernel.sleep rand(10); puts line.chomp }
end
pool.waitall
puts 'finished'


回答4:

The standard library documentation for Queue has

require 'thread'

queue = Queue.new

producer = Thread.new do
  5.times do |i|
    sleep rand(i) # simulate expense
    queue << i
    puts "#{i} produced"
  end
end

consumer = Thread.new do
  5.times do |i|
    value = queue.pop
    sleep rand(i/2) # simulate expense
    puts "consumed #{value}"
  end
end

consumer.join

I do find it a little verbose though.

Wikipedia describes this as a thread pool pattern



回答5:

arr = IO.readlines("filename")