I would like to integrate a system of differential equations for several parameter combinations using Python's multiprocessing module. So, the system should get integrated and the parameter combination should be stored as well as its index and the final value of one of the variables.
While that works fine when I use apply_async
- which is already faster than doing it in a simple for-loop - I fail to implement the same thing using map_async
which seems to be faster than apply_async
. The callback function is never called and I have no clue why. Could anyone explain why this happens and how to get the same output using map_async
instead of apply_async
?!
Here is my code:
from pylab import *
import multiprocessing as mp
from scipy.integrate import odeint
import time
#my system of differential equations
def myODE (yn,tvec,allpara):
(x, y, z) = yn
a, b = allpara['para']
dx = -x + a*y + x*x*y
dy = b - a*y - x*x*y
dz = x*y
return (dx, dy, dz)
#returns the index of the parameter combination, the parameters and the integrated solution
#this way I know which parameter combination belongs to which outcome in the asynch-case
def runMyODE(yn,tvec,allpara):
return allpara['index'],allpara['para'],transpose(odeint(myODE, yn, tvec, args=(allpara,)))
#for reproducibility
seed(0)
#time settings for integration
dt = 0.01
tmax = 50
tval = arange(0,tmax,dt)
numVar = 3 #number of variables (x, y, z)
numPar = 2 #number of parameters (a, b)
numComb = 5 #number of parameter combinations
INIT = zeros((numComb,numVar)) #initial conditions will be stored here
PARA = zeros((numComb,numPar)) #parameter combinations for a and b will be stored here
#create some initial conditions and random parameters
for combi in range(numComb):
INIT[combi,:] = append(10*rand(2),0) #initial conditions for x and y are randomly chosen, z is 0
PARA[combi,:] = 10*rand(2) #parameter a and b are chosen randomly
#################################using loop over apply####################
#results will be stored in here
asyncResultsApply = []
#my callback function
def saveResultApply(result):
# storing the index, a, b and the final value of z
asyncResultsApply.append((result[0], result[1], result[2][2,-1]))
#start the multiprocessing part
pool = mp.Pool(processes=4)
for combi in range(numComb):
pool.apply_async(runMyODE, args=(INIT[combi,:],tval,{'para': PARA[combi,:], 'index': combi}), callback=saveResultApply)
pool.close()
pool.join()
for res in asyncResultsApply:
print res[0], res[1], res[2] #printing the index, a, b and the final value of z
#######################################using map#####################
#the only difference is that the for loop is replaced by a "map_async" call
print "\n\nnow using map\n\n"
asyncResultsMap = []
#my callback function which is never called
def saveResultMap(result):
# storing the index, a, b and the final value of z
asyncResultsMap.append((result[0], result[1], result[2][2,-1]))
pool = mp.Pool(processes=4)
pool.map_async(lambda combi: runMyODE(INIT[combi,:], tval, {'para': PARA[combi,:], 'index': combi}), range(numComb), callback=saveResultMap)
pool.close()
pool.join()
#this does not work yet
for res in asyncResultsMap:
print res[0], res[1], res[2] #printing the index, a, b and the final value of z
If I understood you correctly, it stems from something that confuses people quite often.
apply_async
's callback is called after the single op, but so doesmap
's - it does not call the callback on each element, but rather once on the entire result.You are correct in noting that
map
is faster thanapply_async
s. If you want something to happen after each result, there are a few ways to go:You can effectively add the callback to the operation you want to be performed on each element, and
map
using that.You could use
imap
(orimap_unordered
) in a loop, and do the callback within the loop body. Of course, this means that all will be performed in the parent process, but the nature of stuff written as callbacks means that's usually not a problem (it tends to be cheap functions). YMMV.For example, suppose you have the functions
f
andcb
, and you'd like tomap
f
ones
withcb
for each op. Then you could either do:or