可以将文章内容翻译成中文,广告屏蔽插件可能会导致该功能失效(如失效,请关闭广告屏蔽插件后再试):
问题:
So I have a bunch of functions, that don't depend on each other to do their stuff, and each of them takes quite some time. So i thought i would safe runtime if I could use Multiple Threads. For example:
axial_velocity = calc_velocity(data_axial, factors_axial)
radial_velocity = calc_velocity(data_radial, factors_radial)
circumferential_velocity = calc_velocity(data_circ, factors_circ)
All my variables so far are lists (pretty long lists too)
I have to do this for every input file, and this takes hours if there are more than 200... (I excpect about 1000+)
To reduce the runtime I tried to check compute the data as little as possible (especially sanity checks) which helped greatly, but the next improvement would be using one thread for each set of data.
I've tried something like this (oversimplyfied):
from multiprocessing import Pool
def calc_velocity(data, factor):
buffer_list = []
for index, line in enumerate(data):
buffer_list.append(data[index] * factor[index])
return buffer_list
data_axial = [1, 2, 3]
factors_axial = [3, 2, 1]
if __name__ == '__main__':
p = Pool(4)
axial_velocity = p.map(calc_velocity, args = (data_axial, factors_axial))
and:
from multiprocessing import Process
def calc_velocity(data_pack):
data = []
factor = []
data.extend(data_pack[0])
factor.extend(data_pack[1])
buffer_list = []
for index, line in enumerate(data):
buffer_list.append(data[index] * factor[index])
return buffer_list
data_axial = [1, 2, 3]
factors_axial = [3, 2, 1]
if __name__ == '__main__':
data_pack = []
data_pack.append(data_axial)
data_pack.append(factors_axial)
p = Process(target = calc_velocity, args = data_pack)
p.start()
p.join()
print p
None of these work, and I can't figure out how to make them work.
回答1:
If you don't need the results as soon as they are completed, a simple multiprocessing.Pool.map()
is more than enough to separate your task into separate processes to run in parallel, e.g.:
import multiprocessing
def worker(args): # a worker function invoked for each sub-process
data, factor = args[0], args[1] # Pool.map() sends a single argument so unpack them
return [e * factor[i] for i, e in enumerate(data)]
if __name__ == "__main__": # important process guard for cross-platform use
calc_pool = multiprocessing.Pool(processes=3) # we only need 3 processes
data = ( # pack our data for multiprocessing.Pool.map() ingestion
(data_axial, factors_axial),
(data_radial, factors_radial),
(data_circ, factors_circ)
)
# run our processes and await responses
axial_velocity, radial_velocity, circumferential_velocity = calc_pool.map(worker, data)
However, the concerning part in your question lies in the hint that you have a large sum of data to pass around - when Python uses multiprocessing it doesn't share its memory, and while at least on systems with fork
it can use copy-on-write optimization, passing data between processes always invokes an extremely slow pickle-unpickle routine to pack and send the data.
For that reason, make sure the amount of data you exchange is minimal - for example, if you were loading data_axial
and factors_axial
from a file, better just send the file path(s) and let the worker()
process load/parse the file(s) itself than load the file in your main process and then send over the loaded data.
If you need to frequently (and randomly) access large amounts of (mutable) shared data in your sub-processes, I'd suggest you to use some in-memory database for the task, like Redis.
回答2:
when I want to do multiprocessing in python I use threads, the following code should be a example for using threads in python:
from threading import Thread
import time
def time1(a, b):
print a
time.sleep(10)
print time.time(), a
return b
def time2(c, d):
print c
time.sleep(10)
print time.time(), c
return d
if __name__ == '__main__':
# target: the function name (pointer),
# args: a tuple of the arguments that you want to send to your function
t1 = Thread(target = time1, args=(1, 2))
t2 = Thread(target = time2, args=(3, 4))
# start the functions:
a = t1.start()
b = t2.start()
print a
print b
as you can see in this code, threads can't return a value, so there are two ways
to return a value in threads, one: you can write the output into a file, and then read the file a try\except block, or you can change a global value to what you want to return. If you still want to use multiprocessing, you can find some help here: how to get the return value from a thread in python?
Hope that was helpful.
回答3:
Your first example is almost there. However Pool
doesn't take an arg
keyword. Additionally, Pool.map()
only lets you pass a single argument to a function. To pass multiple arguments, you have to pack them into another structure, like a tuple, as you did in your second example.
This modified version of your first example works.
from multiprocessing import Pool
def calc_velocity(work_args):
buffer_list = []
for index, line in enumerate(work_args[0]):
buffer_list.append(work_args[0][index] * work_args[1][index])
return buffer_list
data_axial = [1, 2, 3]
factors_axial = [3, 2, 1]
if __name__ == '__main__':
p = Pool(4)
work_args = (data_axial, factors_axial)
axial_velocity = p.map(calc_velocity, [work_args])
If the calc_velocity
function is actually representative of your function, then you could use numpy's multiply
function to make it easier (and faster). Your calc_velocity
function would just be:
def calc_velocity(work_args):
return numpy.multiply(work_args[0], work_args[1])