I have a large file (hundreds of megs) that consists of filenames, one per line.
I need to loop through the list of filenames, and fork off a process for each filename. I want a maximum of 8 forked processes at a time and I don't want to read the whole filename list into RAM at once.
I'm not even sure where to begin, can anyone help me out?
File.foreach("large_file").each_slice(8) do |eight_lines|
# eight_lines is an array containing 8 lines.
# at this point you can iterate over these filenames
# and spawn off your processes/threads
end
It sounds like the Process module will be useful for this task. Here's something I quickly threw together as a starting point:
include Process
i = 0
for line in open('files.txt') do
i += 1
fork { `sleep #{rand} && echo "#{i} - #{line.chomp}" >> numbers.txt` }
if i >= 8
wait # join any single child process
i -= 1
end
end
waitall # join all remaining child processes
Output:
hello
goodbye
test1
test2
a
b
c
d
e
f
g
$ ruby b.rb
$ cat numbers.txt
1 - hello
3 -
2 - goodbye
5 - test2
6 - a
4 - test1
7 - b
8 - c
8 - d
8 - e
8 - f
8 - g
The way this works is that:
for line in open(XXX)
will lazily iterate over the lines of the file you specify.
fork
will spawn a child process executing the given block, and in this case, we use backticks to indicate something to be executed by the shell. Note that rand
returns a value 0-1 here so we are sleeping less than a second, and I call line.chomp
to remove the trailing newline that we get from line
.
- If we've accumulated 8 or more processes, call
wait
to stop everything until one of them returns.
- Finally, outside the loop, call
waitall
to join all remaining processes before exiting the script.
Here's Mark's solution wrapped up as a ProcessPool
class, might be helpful to have it around (and please correct me if I made some mistake):
class ProcessPool
def initialize pool_size
@pool_size = pool_size
@free_slots = @pool_size
end
def fork &p
if @free_slots == 0
Process.wait
@free_slots += 1
end
@free_slots -= 1
puts "Free slots: #{@free_slots}"
Process.fork &p
end
def waitall
Process.waitall
end
end
pool = ProcessPool.new 8
for line in open('files.txt') do
pool.fork { Kernel.sleep rand(10); puts line.chomp }
end
pool.waitall
puts 'finished'
The standard library documentation for Queue has
require 'thread'
queue = Queue.new
producer = Thread.new do
5.times do |i|
sleep rand(i) # simulate expense
queue << i
puts "#{i} produced"
end
end
consumer = Thread.new do
5.times do |i|
value = queue.pop
sleep rand(i/2) # simulate expense
puts "consumed #{value}"
end
end
consumer.join
I do find it a little verbose though.
Wikipedia describes this as a thread pool pattern
arr = IO.readlines("filename")