Multiprocessing Pool with a for loop

2019-06-20 15:58发布

问题:

I have a list of files that I pass into a for loop and do a whole bunch of functions. Whats the easiest way to parallelize this? Not sure I could find this exact thing anywhere and I think my current implementation is incorrect because I only saw one file being run. From some reading I've done, I think this should be a perfectly parallel case.

Old code is something like this:

import pandas as pd
filenames = ['file1.csv', 'file2.csv', 'file3.csv', 'file4.csv']
for file in filenames:
    file1 = pd.read_csv(file)
    print('running ' + str(file))
    a = function1(file1)
    b = function2(a)
    c = function3(b)
    for d in range(1,6):
            e = function4(c, d)
    c.to_csv('output.csv')

(incorrectly) Parallelized code

import pandas as pd
from multiprocessing import Pool
filenames = ['file1.csv', 'file2.csv', 'file3.csv', 'file4.csv']
def multip(filenames):
    file1 = pd.read_csv(file)
    print('running ' + str(file))
    a = function1(file1)
    b = function2(a)
    c = function3(b)
    for d in range(1,6):
            e = function4(c, d)
    c.to_csv('output.csv')

if __name__ == '__main__'
    pool = Pool(processes=4)
    runstuff = pool.map(multip(filenames))

What I (think) I want to do is have one file be computed per core (maybe per process?). I also did

multiprocessing.cpu_count()

and got 8 (I have a quad so its probably taking into account threads). Since I have around 10 files total, if I can put one file per process to speed things up that would be great! I would hope the remaining 2 files would find a process after the processes from the first round complete as well.

Edit: for further clarity, the functions (i.e. function1, function2 etc) also feed into other functions (i.e function1a, function1b) inside their respective files. I call function 1 using an import statement.

I get the following error:

OSError: Expected file path name or file-like object, got <class 'list'> type

Apparently doesn't like being passed a list but i don't want to do filenames[0] in the if statement because that only runs one file

回答1:

import multiprocessing
names = ['file1.csv', 'file2.csv']
def multip(name):
     [do stuff here]

if __name__ == '__main__':
    #use one less process to be a little more stable
    p = multiprocessing.Pool(processes = multiprocessing.cpu_count()-1)
    #timing it...
    start = time.time()
    for file in names:
    p.apply_async(multip, [file])

    p.close()
    p.join()
    print("Complete")
    end = time.time()
    print('total time (s)= ' + str(end-start))

EDIT: Swap out the if__name__== '____main___' for this one. This runs all the files:

if __name__ == '__main__':

    p = Pool(processes = len(names))
    start = time.time()
    async_result = p.map_async(multip, names)
    p.close()
    p.join()
    print("Complete")
    end = time.time()
    print('total time (s)= ' + str(end-start))