I'm compressing files. A single process is fine for a few of them, but I'm compressing thousands of them and this can (and has) taken several days, so I’d like to speed it up with multiprocessing. I’ve read that I should avoid having multiple processes reading files at the same time, and I’m guessing I shouldn’t have multiple processes writing at once as well. This is my current method that runs singly:
import tarfile, bz2, os
def compress(folder):
"compresses a folder into a file"
bz_file = bz2.BZ2File(folder+'.tbz', 'w')
with tarfile.open(mode='w', fileobj = bz_file) as tar:
for fn in os.listdir(folder):
read each file in the folder and do some pre processing
that will make the compressed file much smaller than without
tar.addfile( processed file )
bz_file.close()
return
This is taking a folder and compressing all its contents into a single file. This makes them easier to handle and more organized. If I just tossed this into a pool, then I’d have several processes reading and writing all at once, so I want to avoid that. I can rework it so only one process is reading the files but I still have multiple ones writing:
import multiprocessing as mp
import tarfile, bz2, os
def compress(file_list):
folder = file_list[0]
bz_file = bz2.BZ2File(folder+'.tbz', 'w')
with tarfile.open(mode='w', fileobj = bz_file) as tar:
for i in file_list[1:]:
preprocess file data
tar.addfile(processed data)
bz_file.close()
return
cpu_count = mp.cpu_count()
p = mp.Pool(cpu_count)
for subfolder in os.listdir(main_folder):
read all files in subfolder into memory, place into file_list
place file_list into fld_list until fld_list contains cpu_count
file lists. then pass to p.map(compress, fld_list)
This still has a number of processes writing compressed files at once. Just the act of telling tarfile what kind of compression to use starts writing to the hard drive. I cannot read all the files I need to compress into memory as I don’t have that amount of RAM to do so – so it also has the issue that I’m restarting Pool.map many times.
How can I read and write files in a single process, yet have all the compression in several processes, while avoiding restarting multiprocessing.Pool multiple times?
Instead of using
multiprocessing.Pool
, one should usemultiprocessing.Queue
and create an inbox and an outbox.Start a single process to read in the files and place the data into the inbox queue, and put a limit on the size of the queue so you don't end up filling your RAM. The example here compresses single files, but it can be adjusted to handle whole folders at once.
But that's only half of the question, the other part is to compress the file without having to write it to disk. We give a
StringIO
object to the compression function instead of an open file; it is passed totarfile
. Once compressed we put the StringIO object into the outbox queue.Except we can't do that, because StringIO objects can't be pickled, only pickleable objects can go into a queue. However, the
getvalue
function of StringIO can give the contents in a pickable format, so grab the contents with getvalue, close the StringIO object and then put the contents into the outbox.The writer process then extracts the contents from the outbox queue and writes them to disk. This function will need to know how many compression processes were started so it knows only to stop when it has heard that every process has stopped.
Finally, there's the set up to put them all together