Python multiprocessing - How can I split workload

2020-03-27 18:15发布

问题:

I am writing a simple code of cropping images and saving it.
But the problem is that the number of images is about 150,000+ and I want to improve the speed.

So, at first I wrote a code with simple for loops, like the following:

import cv2
import numpy
import sys

textfile=sys.argv[1]
file_list=open(textfile)
files=file_list.read().split('\n')
idx=0
for eachfile in files:
    image=cv2.imread(eachfile)
    idx+=1
    if image is None:
        pass
    outName=eachfile.replace('/data','/changed_data')
    if image.shape[0]==256:
        image1=image[120:170,120:170]
    elif image.shape[0]==50:
        image1=image
    cv2.imwrite(outName,image1)
    print idx,outName

This code took about 38 seconds for 90000 images. But, using dual core took more time than single process, about 48 seconds for the same 90000 images.

import cv2
import sys
import numpy
from multiprocessing import Pool

def crop(eachfile):
    image=cv2.imread(eachfile)
    idx+=1
    if image is None:
        pass
    outName=eachfile.replace('/data','/changed_data')
    if image.shape[0]==256:
        image1=image[120:170,120:170]
    elif image.shape[0]==50:
        image1=image
    cv2.imwrite(outName,image1)
    print idx,outName


if __name__=='__main__':
    textfile=sys.argv[1]
    file_list=open(textfile)
    files=file_list.read().split('\n')
    pool=Pool(2)
    pool.map(crop,files)

Am I doing the right thing for speeding up the process? Or should I split the list and send each list to the process?

Any comments regard my code would be great!!!

Thanks in advance!!!

回答1:

You should indeed split the task over two cores. Play around with this example code "slightly modified". OP can be found here. Where you see data that is your hook providing your images. The defs don't work under class when using multiprocessing... If your trying to use pathos...you'll get errors from cPickle... some nagging issue with latest 2.7 version. Doesn't occur in 3.5 or something. Enjoy!

import multiprocessing

def mp_worker((inputs, the_time)):
    print " Process %s\tWaiting %s seconds" % (inputs, the_time)
    time.sleep(int(the_time))
    print " Process %s\tDONE" % inputs
    sys.stdout.flush()

def mp_handler():                           # Non tandem pair processing
    p = multiprocessing.Pool(2)
    p.map(mp_worker, data)

def mp_handler_tandem():
    subdata = zip(data[0::2], data[1::2])
#    print subdata
    for task1, task2 in subdata:
        p = multiprocessing.Pool(2)
        p.map(mp_worker, (task1, task2))

#data = (['a', '1'], ['b', '2'], ['c', '3'], ['d', '4'])
data = (['a', '2'], ['b', '3'], ['c', '1'], ['d', '4'], 
        ['e', '1'], ['f', '2'], ['g', '3'], ['h', '4'])

if __name__ == '__main__':
    sys.stdout.flush()
#    print 'mp_handler():'
#    mp_handler()
#    print '---'
#    time.sleep(2)

#    print '\nmp_handler_tandem():'
#    mp_handler_tandem()
    print '---'
#    time.sleep(2)

    Multiprocess().qmp_handler()

working within an editor: use sys.stdout.flush() to flush your output to screen while it happens.

But check also here using kernels and splitting jobs.