Simple Multitasking

2019-07-23 22:36发布

问题:

So I have a bunch of functions, that don't depend on each other to do their stuff, and each of them takes quite some time. So i thought i would safe runtime if I could use Multiple Threads. For example:

axial_velocity = calc_velocity(data_axial, factors_axial)
radial_velocity = calc_velocity(data_radial, factors_radial)
circumferential_velocity = calc_velocity(data_circ, factors_circ)

All my variables so far are lists (pretty long lists too)

I have to do this for every input file, and this takes hours if there are more than 200... (I excpect about 1000+)

To reduce the runtime I tried to check compute the data as little as possible (especially sanity checks) which helped greatly, but the next improvement would be using one thread for each set of data.

I've tried something like this (oversimplyfied):

from multiprocessing import Pool

def calc_velocity(data, factor):
    buffer_list = []
    for index, line in enumerate(data):
        buffer_list.append(data[index] * factor[index])
    return buffer_list

data_axial = [1, 2, 3]
factors_axial = [3, 2, 1]

if __name__ == '__main__':
    p = Pool(4)
    axial_velocity = p.map(calc_velocity, args = (data_axial, factors_axial))

and:

from multiprocessing import Process


def calc_velocity(data_pack):
    data = []
    factor = []
    data.extend(data_pack[0])
    factor.extend(data_pack[1])
    buffer_list = []
    for index, line in enumerate(data):
        buffer_list.append(data[index] * factor[index])
    return buffer_list


data_axial = [1, 2, 3]
factors_axial = [3, 2, 1]

if __name__ == '__main__':
    data_pack = []
    data_pack.append(data_axial)
    data_pack.append(factors_axial)
    p = Process(target = calc_velocity, args = data_pack)
    p.start()
    p.join()
    print p

None of these work, and I can't figure out how to make them work.

回答1:

If you don't need the results as soon as they are completed, a simple multiprocessing.Pool.map() is more than enough to separate your task into separate processes to run in parallel, e.g.:

import multiprocessing

def worker(args):  # a worker function invoked for each sub-process
    data, factor = args[0], args[1]  # Pool.map() sends a single argument so unpack them
    return [e * factor[i] for i, e in enumerate(data)]

if __name__ == "__main__":  # important process guard for cross-platform use
    calc_pool = multiprocessing.Pool(processes=3)  # we only need 3 processes
    data = (  # pack our data for multiprocessing.Pool.map() ingestion
        (data_axial, factors_axial),
        (data_radial, factors_radial),
        (data_circ, factors_circ)
    )
    # run our processes and await responses
    axial_velocity, radial_velocity, circumferential_velocity = calc_pool.map(worker, data)

However, the concerning part in your question lies in the hint that you have a large sum of data to pass around - when Python uses multiprocessing it doesn't share its memory, and while at least on systems with fork it can use copy-on-write optimization, passing data between processes always invokes an extremely slow pickle-unpickle routine to pack and send the data.

For that reason, make sure the amount of data you exchange is minimal - for example, if you were loading data_axial and factors_axial from a file, better just send the file path(s) and let the worker() process load/parse the file(s) itself than load the file in your main process and then send over the loaded data.

If you need to frequently (and randomly) access large amounts of (mutable) shared data in your sub-processes, I'd suggest you to use some in-memory database for the task, like Redis.



回答2:

when I want to do multiprocessing in python I use threads, the following code should be a example for using threads in python:

from threading import Thread
import time

def time1(a, b):
    print a
    time.sleep(10)
    print time.time(), a
    return b

def time2(c, d):
    print c
    time.sleep(10)
    print time.time(), c
    return d

if __name__ == '__main__':
    # target: the function name (pointer),
    # args: a tuple of the arguments that you want to send to your function
    t1 = Thread(target = time1, args=(1, 2))
    t2 = Thread(target = time2, args=(3, 4))

    # start the functions:
    a = t1.start()
    b = t2.start()
    print a
    print b

as you can see in this code, threads can't return a value, so there are two ways to return a value in threads, one: you can write the output into a file, and then read the file a try\except block, or you can change a global value to what you want to return. If you still want to use multiprocessing, you can find some help here: how to get the return value from a thread in python?

Hope that was helpful.



回答3:

Your first example is almost there. However Pool doesn't take an arg keyword. Additionally, Pool.map() only lets you pass a single argument to a function. To pass multiple arguments, you have to pack them into another structure, like a tuple, as you did in your second example.

This modified version of your first example works.

from multiprocessing import Pool

def calc_velocity(work_args):
    buffer_list = []
    for index, line in enumerate(work_args[0]):
        buffer_list.append(work_args[0][index] * work_args[1][index])
    return buffer_list

data_axial = [1, 2, 3]
factors_axial = [3, 2, 1]

if __name__ == '__main__':
    p = Pool(4)
    work_args = (data_axial, factors_axial)
    axial_velocity = p.map(calc_velocity, [work_args])

If the calc_velocity function is actually representative of your function, then you could use numpy's multiply function to make it easier (and faster). Your calc_velocity function would just be:

def calc_velocity(work_args):
    return numpy.multiply(work_args[0], work_args[1])