Read, compress, write with multiprocessing

2020-07-19 03:52发布

I'm compressing files. A single process is fine for a few of them, but I'm compressing thousands of them and this can (and has) taken several days, so I’d like to speed it up with multiprocessing. I’ve read that I should avoid having multiple processes reading files at the same time, and I’m guessing I shouldn’t have multiple processes writing at once as well. This is my current method that runs singly:

import tarfile, bz2, os
def compress(folder):
    "compresses a folder into a file"

    bz_file = bz2.BZ2File(folder+'.tbz', 'w')

    with tarfile.open(mode='w', fileobj = bz_file) as tar:

        for fn in os.listdir(folder):

            read each file in the folder and do some pre processing
            that will make the compressed file much smaller than without

            tar.addfile( processed file )

    bz_file.close()
    return

This is taking a folder and compressing all its contents into a single file. This makes them easier to handle and more organized. If I just tossed this into a pool, then I’d have several processes reading and writing all at once, so I want to avoid that. I can rework it so only one process is reading the files but I still have multiple ones writing:

import multiprocessing as mp
import tarfile, bz2, os

def compress(file_list):
    folder = file_list[0]
    bz_file = bz2.BZ2File(folder+'.tbz', 'w')

    with tarfile.open(mode='w', fileobj = bz_file) as tar:

        for i in file_list[1:]:
            preprocess file data
            tar.addfile(processed data)

    bz_file.close()
    return

cpu_count = mp.cpu_count()
p = mp.Pool(cpu_count)

for subfolder in os.listdir(main_folder):

    read all files in subfolder into memory, place into file_list
    place file_list into fld_list until fld_list contains cpu_count
    file lists. then pass to  p.map(compress, fld_list)

This still has a number of processes writing compressed files at once. Just the act of telling tarfile what kind of compression to use starts writing to the hard drive. I cannot read all the files I need to compress into memory as I don’t have that amount of RAM to do so – so it also has the issue that I’m restarting Pool.map many times.

How can I read and write files in a single process, yet have all the compression in several processes, while avoiding restarting multiprocessing.Pool multiple times?

1条回答
ゆ 、 Hurt°
2楼-- · 2020-07-19 03:55

Instead of using multiprocessing.Pool, one should use multiprocessing.Queue and create an inbox and an outbox.

Start a single process to read in the files and place the data into the inbox queue, and put a limit on the size of the queue so you don't end up filling your RAM. The example here compresses single files, but it can be adjusted to handle whole folders at once.

def reader(inbox, input_path, num_procs):
    "process that reads in files to be compressed and puts to inbox"

    for fn in os.listdir(input_path):
        path = os.path.join(input_path, fn)

        # read in each file, put data into inbox
        fname = os.path.basename(fn)
        with open(fn, 'r') as src: lines = src.readlines()

        data = [fname, lines]
        inbox.put(data)

    # read in everything, add finished notice for all running processes
    for i in range(num_procs):
        inbox.put(None)  # when a compressor sees a None, it will stop
    inbox.close()
    return

But that's only half of the question, the other part is to compress the file without having to write it to disk. We give a StringIO object to the compression function instead of an open file; it is passed to tarfile. Once compressed we put the StringIO object into the outbox queue.

Except we can't do that, because StringIO objects can't be pickled, only pickleable objects can go into a queue. However, the getvalue function of StringIO can give the contents in a pickable format, so grab the contents with getvalue, close the StringIO object and then put the contents into the outbox.

from io import StringIO
import tarfile

def compressHandler(inbox, outbox):
    "process that pulls from inbox, compresses and puts to outbox"
    supplier = iter(inbox.get, None)  # stops when gets a None
    while True:
        try:
            data = next(supplier)  # grab data from inbox
            pressed = compress(data)  # compress it
            ou_que.put(pressed)  # put into outbox
        except StopIteration:
            outbox.put(None)  # finished compressing, inform the writer
            return  # and quit

def compress(data):
    "compress file"
    bz_file = StringIO()

    fname, lines = dat  # see reader def for package order

    with tarfile.open(mode='w:bz2', fileobj=bz_file) as tar:

        info = tarfile.TarInfo(fname)  # store file name
        tar.addfile(info, StringIO(''.join(lines)))  # compress

    data = bz_file.getvalue()
    bz_file.close()
    return data

The writer process then extracts the contents from the outbox queue and writes them to disk. This function will need to know how many compression processes were started so it knows only to stop when it has heard that every process has stopped.

def writer(outbox, output_path, num_procs):
    "single process that writes compressed files to disk"
    num_fin = 0

    while True:
        # all compression processes have finished
        if num_finished >= num_procs: break

        tardata = outbox.get()

        # a compression process has finished
        if tardata == None:
            num_fin += 1
            continue

        fn, data = tardata
        name = os.path.join(output_path, fn) + '.tbz'

        with open(name, 'wb') as dst: dst.write(data)
    return

Finally, there's the set up to put them all together

import multiprocessing as mp
import os

def setup():
    fld = 'file/path'

    # multiprocess setup
    num_procs = mp.cpu_count()

    # inbox and outbox queues
    inbox = mp.Queue(4*num_procs)  # limit size 
    outbox = mp.Queue()

    # one process to read
    reader = mp.Process(target = reader, args = (inbox, fld, num_procs))
    reader.start()

    # n processes to compress
    compressors = [mp.Process(target = compressHandler, args = (inbox, outbox))
                   for i in range(num_procs)]
    for c in compressors: c.start()

    # one process to write
    writer = mp.Process(target = writer, args=(outbox, fld, num_procs))
    writer.start()
    writer.join()  # wait for it to finish
    print('done!')
查看更多
登录 后发表回答