Function that multiprocesses another function

2019-05-09 08:50发布

问题:

I'm performing analyses of time-series of simulations. Basically, it's doing the same tasks for every time steps. As there is a very high number of time steps, and as the analyze of each of them is independant, I wanted to create a function that can multiprocess another function. The latter will have arguments, and return a result.

Using a shared dictionnary and the lib concurrent.futures, I managed to write this :

import concurrent.futures as Cfut
def multiprocess_loop_grouped(function, param_list, group_size, Nworkers, *args):
    # function : function that is running in parallel
    # param_list : list of items
    # group_size : size of the groups
    # Nworkers : number of group/items running in the same time
    # **param_fixed : passing parameters

    manager = mlp.Manager()
    dic = manager.dict()
    executor = Cfut.ProcessPoolExecutor(Nworkers)

    futures = [executor.submit(function, param, dic, *args)
           for param in grouper(param_list, group_size)]

    Cfut.wait(futures)
    return [dic[i] for i in sorted(dic.keys())]

Typically, I can use it like this :

def read_file(files, dictionnary):
    for file in files:
        i = int(file[4:9])
        #print(str(i))
        if 'bz2' in file:
            os.system('bunzip2 ' + file)
            file = file[:-4]
        dictionnary[i] = np.loadtxt(file)
        os.system('bzip2 ' + file)

Map = np.array(multiprocess_loop_grouped(read_file, list_alti, Group_size, N_thread))

or like this :

def autocorr(x):
    result = np.correlate(x, x, mode='full')
    return result[result.size//2:]

def find_lambda_finger(indexes, dic, Deviation):
    for i in indexes :
        #print(str(i))
        # Beach = Deviation[i,:] - np.mean(Deviation[i,:])
        dic[i] = Anls.find_first_max(autocorr(Deviation[i,:]), valmax = True)

args = [Deviation]
Temp = Rescal.multiprocess_loop_grouped(find_lambda_finger, range(Nalti), Group_size, N_thread, *args)

Basically, it is working. But it is not working well. Sometimes it crashes. Sometimes it actually launches a number of python processes equal to Nworkers, and sometimes there is only 2 or 3 of them running at a time while I specified Nworkers = 15.

For example, a classic error I obtain is described in the following topic I raised : Calling matplotlib AFTER multiprocessing sometimes results in error : main thread not in main loop

What is the more Pythonic way to achieve what I want ? How can I improve the control this function ? How can I control more the number of running python process ?

回答1:

One of the basic concepts for Python multi-processing is using queues. It works quite well when you have an input list that can be iterated and which does not need to be altered by the sub-processes. It also gives you a good control over all the processes, because you spawn the number you want, you can run them idle or stop them.

It is also a lot easier to debug. Sharing data explicitly is usually an approach that is much more difficult to setup correctly.

Queues can hold anything as they are iterables by definition. So you can fill them with filepath strings for reading files, non-iterable numbers for doing calculations or even images for drawing.

In your case a layout could look like that:

import multiprocessing as mp
import numpy as np
import itertools as it


def worker1(in_queue, out_queue):
    #holds when nothing is available, stops when 'STOP' is seen
    for a in iter(in_queue.get, 'STOP'):
        #do something
        out_queue.put({a: result}) #return your result linked to the input

def worker2(in_queue, out_queue):
    for a in iter(in_queue.get, 'STOP'):
        #do something differently
        out_queue.put({a: result}) //return your result linked to the input

def multiprocess_loop_grouped(function, param_list, group_size, Nworkers, *args):
    # your final result
    result = {}

    in_queue = mp.Queue()
    out_queue = mp.Queue()

    # fill your input
    for a in param_list:
        in_queue.put(a)
    # stop command at end of input
    for n in range(Nworkers):
        in_queue.put('STOP')

    # setup your worker process doing task as specified
    process = [mp.Process(target=function,
               args=(in_queue, out_queue), daemon=True) for x in range(Nworkers)]

    # run processes
    for p in process:
        p.start()

    # wait for processes to finish
    for p in process:
        p.join()

    # collect your results from the calculations
    for a in param_list:
        result.update(out_queue.get())

    return result

temp = multiprocess_loop_grouped(worker1, param_list, group_size, Nworkers, *args)
map = multiprocess_loop_grouped(worker2, param_list, group_size, Nworkers, *args)

It can be made a bit more dynamic when you are afraid that your queues will run out of memory. Than you need to fill and empty the queues while the processes are running. See this example here.

Final words: it is not more Pythonic as you requested. But it is easier to understand for a newbie ;-)