Julia pmap speed - parallel processing - dynamic p

2019-08-25 03:01发布

问题:

I am trying to speed up filling in a matrix for a dynamic programming problem in Julia (v0.6.0), and I can't seem to get much extra speed from using pmap. This is related to this question I posted almost a year ago: Filling a matrix using parallel processing in Julia. I was able to speed up serial processing with some great help then, and I'm now trying to get extra speed from parallel processing tools in Julia.

For the serial processing case, I was using a 3-dimensional matrix (essentially a set of equally-sized matrices, indexed by the 1st-dimension) and iterating over the 1st-dimension. I wanted to give pmap a try, though, to more efficiently iterate over the set of matrices.

Here is the code setup. To use pmap with the v_iter function below, I converted the three dimensional matrix into a dictionary object, with the dictionary keys equal to the index values in the 1st dimension (v_dict in the code below, with gcc equal to the 1st-dimension size). The v_iter function takes other dictionary objects (E_opt_dict and gridpoint_m_dict below) as additional inputs:

function v_iter(a,b,c)
   diff_v = 1
   while diff_v>convcrit
     diff_v = -Inf

     #These lines efficiently multiply the value function by the Markov transition matrix, using the A_mul_B function
     exp_v       = zeros(Float64,gkpc,1)
     A_mul_B!(exp_v,a[1:gkpc,:],Zprob[1,:])
     for j=2:gz
       temp=Array{Float64}(gkpc,1)
       A_mul_B!(temp,a[(j-1)*gkpc+1:(j-1)*gkpc+gkpc,:],Zprob[j,:])
       exp_v=hcat(exp_v,temp)
     end    

     #This tries to find the optimal value of v
     for h=1:gm
       for j=1:gz
         oldv = a[h,j]
         newv = (1-tau)*b[h,j]+beta*exp_v[c[h,j],j]
         a[h,j] = newv
         diff_v = max(diff_v, oldv-newv, newv-oldv)
       end
     end
   end
end

gz =  9  
gp =  13  
gk =  17  
gcc =  5  
gm    = gk * gp * gcc * gz
gkpc  = gk * gp * gcc
gkp = gk*gp
beta  = ((1+0.015)^(-1))
tau        = 0.35
Zprob = [0.43 0.38 0.15 0.03 0.00 0.00 0.00 0.00 0.00; 0.05 0.47 0.35 0.11 0.02 0.00 0.00 0.00 0.00; 0.01 0.10 0.50 0.30 0.08 0.01 0.00 0.00 0.00; 0.00 0.02 0.15 0.51 0.26 0.06 0.01  0.00 0.00; 0.00 0.00 0.03 0.21 0.52 0.21 0.03 0.00 0.00 ; 0.00 0.00  0.01  0.06 0.26 0.51 0.15 0.02 0.00 ; 0.00 0.00 0.00 0.01 0.08 0.30 0.50 0.10 0.01 ; 0.00 0.00 0.00 0.00 0.02 0.11 0.35 0.47 0.05; 0.00 0.00 0.00 0.00 0.00 0.03 0.15 0.38 0.43]
convcrit = 0.001   # chosen convergence criterion

E_opt                  = Array{Float64}(gcc,gm,gz)    
fill!(E_opt,10.0)

gridpoint_m   = Array{Int64}(gcc,gm,gz)
fill!(gridpoint_m,fld(gkp,2)) 

v_dict=Dict(i => zeros(Float64,gm,gz) for i=1:gcc)
E_opt_dict=Dict(i => E_opt[i,:,:] for i=1:gcc)
gridpoint_m_dict=Dict(i => gridpoint_m[i,:,:] for i=1:gcc) 

For parallel processing, I executed the following two commands:

wp = CachingPool(workers())
addprocs(3)
pmap(wp,v_iter,values(v_dict),values(E_opt_dict),values(gridpoint_m_dict))

...which produced this performance:

135.626417 seconds (3.29 G allocations: 57.152 GiB, 3.74% gc time)

I then tried to serial process instead:

for i=1:gcc
    v_iter(v_dict[i],E_opt_dict[i],gridpoint_m_dict[i])
end

...and received better performance.

128.263852 seconds (3.29 G allocations: 57.101 GiB, 4.53% gc time)

This also gives me about the same performance as running v_iter on the original 3-dimensional objects:

v=zeros(Float64,gcc,gm,gz)
for i=1:gcc
    v_iter(v[i,:,:],E_opt[i,:,:],gridpoint_m[i,:,:])
end

I know that parallel processing involves setup time, but when I increase the value of gcc, I still get about equal processing time for serial and parallel. This seems like a good candidate for parallel processing, since there is no need for messaging between the workers! But I can't seem to make it work efficiently.

回答1:

You create the CachingPool before adding the worker processes. Hence your caching pool passed to pmap tells it to use just a single worker. You can simply check it by running wp.workers you will see something like Set([1]). Hence it should be: addprocs(3) wp = CachingPool(workers()) You could also consider running Julia -p command line parameter e.g. julia -p 3 and then you can skip the addprocs(3) command.

On top of that your for and pmap loops are not equivalent. The Julia Dict object is a hashmap and similar to other languages does not offer anything like element order. Hence in your for loop you are guaranteed to get the same matching i-th element while with the values the ordering of values does not need to match the original ordering (and you can have different order for each of those three variables in the pmap loop). Since the keys for your Dicts are just numbers from 1 up to gcc you should simply use arrays instead. You can use generators very similar to Python. For an example instead of v_dict=Dict(i => zeros(Float64,gm,gz) for i=1:gcc) use v_dict_a = [zeros(Float64,gm,gz) for i=1:gcc]

Hope that helps.



回答2:

Based on @Przemyslaw Szufeul's helpful advice, I've placed below the code that properly executes parallel processing. After running it once, I achieved substantial improvement in running time: 77.728264 seconds (181.20 k allocations: 12.548 MiB)

In addition to reordering the wp command and using the generator Przemyslaw recommended, I also recast v_iter as an anonymous function, in order to avoid having to sprinkle @everywhere around the code to feed functions and data to the workers.

I also added return a to the v_iter function, and set v_a below equal to the output of pmap, since you cannot pass by reference to a remote object.

addprocs(3)
v_iter = function(a,b,c)
   diff_v = 1
   while diff_v>convcrit
     diff_v = -Inf

     #These lines efficiently multiply the value function by the Markov transition matrix, using the A_mul_B function
     exp_v       = zeros(Float64,gkpc,1)
     A_mul_B!(exp_v,a[1:gkpc,:],Zprob[1,:])
     for j=2:gz
       temp=Array{Float64}(gkpc,1)
       A_mul_B!(temp,a[(j-1)*gkpc+1:(j-1)*gkpc+gkpc,:],Zprob[j,:])
       exp_v=hcat(exp_v,temp)
     end    

     #This tries to find the optimal value of v
     for h=1:gm
       for j=1:gz
         oldv = a[h,j]
         newv = (1-tau)*b[h,j]+beta*exp_v[c[h,j],j]
         a[h,j] = newv
         diff_v = max(diff_v, oldv-newv, newv-oldv)
       end
     end
   end
  return a
end

gz =  9  
gp =  13  
gk =  17  
gcc =  5  
gm    = gk * gp * gcc * gz
gkpc  = gk * gp * gcc
gkp   =gk*gp
beta  = ((1+0.015)^(-1))
tau        = 0.35
Zprob = [0.43 0.38 0.15 0.03 0.00 0.00 0.00 0.00 0.00; 0.05 0.47 0.35 0.11 0.02 0.00 0.00 0.00 0.00; 0.01 0.10 0.50 0.30 0.08 0.01 0.00 0.00 0.00; 0.00 0.02 0.15 0.51 0.26 0.06 0.01  0.00 0.00; 0.00 0.00 0.03 0.21 0.52 0.21 0.03 0.00 0.00 ; 0.00 0.00  0.01  0.06 0.26 0.51 0.15 0.02 0.00 ; 0.00 0.00 0.00 0.01 0.08 0.30 0.50 0.10 0.01 ; 0.00 0.00 0.00 0.00 0.02 0.11 0.35 0.47 0.05; 0.00 0.00 0.00 0.00 0.00 0.03 0.15 0.38 0.43]
convcrit = 0.001   # chosen convergence criterion

E_opt                  = Array{Float64}(gcc,gm,gz)    
fill!(E_opt,10.0)

gridpoint_m   = Array{Int64}(gcc,gm,gz)
fill!(gridpoint_m,fld(gkp,2)) 

v_a=[zeros(Float64,gm,gz) for i=1:gcc]
E_opt_a=[E_opt[i,:,:] for i=1:gcc]
gridpoint_m_a=[gridpoint_m[i,:,:] for i=1:gcc]

wp = CachingPool(workers())
v_a = pmap(wp,v_iter,v_a,E_opt_a,gridpoint_m_a)