Filling a matrix using parallel processing in Juli

2019-07-31 21:23发布

问题:

I'm trying to speed up the solution time for a dynamic programming problem in Julia (v. 0.5.0), via parallel processing. The problem involves choosing the optimal values for every element of a 1073 x 19 matrix at every iteration, until successive matrix differences fall within a tolerance. I thought that, within each iteration, filling in the values for each element of the matrix could be parallelized. However, I'm seeing a huge performance degradation using SharedArray, and I'm wondering if there's a better way to approach parallel processing for this problem.

I construct the arguments for the function below:

    est_params = [.788,.288,.0034,.1519,.1615,.0041,.0077,.2,0.005,.7196]

    r          = 0.015
    tau        = 0.35
    rho        =est_params[1]
    sigma      =est_params[2]
    delta      = 0.15
    gamma      =est_params[3]
    a_capital  =est_params[4]
    lambda1    =est_params[5]
    lambda2    =est_params[6]
    s          =est_params[7]
    theta      =est_params[8]
    mu         =est_params[9]
    p_bar_k_ss  =est_params[10]
    beta  = (1+r)^(-1)
    sigma_range = 4
    gz = 19 
    gp = 29 
    gk = 37 

    lnz=collect(linspace(-sigma_range*sigma,sigma_range*sigma,gz))
    z=exp(lnz)

    gk_m = fld(gk,2)
    # Need to add mu somewhere to k_ss
    k_ss =  (theta*(1-tau)/(r+delta))^(1/(1-theta))
    k=cat(1,map(i->k_ss*((1-delta)^i),collect(1:gk_m)),map(i->k_ss/((1-delta)^i),collect(1:gk_m)))
    insert!(k,gk_m+1,k_ss)
    sort!(k)
    p_bar=p_bar_k_ss*k_ss
    p = collect(linspace(-p_bar/2,p_bar,gp))

    #Tauchen
    N     = length(z)
    Z     = zeros(N,1)
    Zprob = zeros(Float32,N,N)

    Z[N]  = lnz[length(z)]
    Z[1]  = lnz[1]

    zstep = (Z[N] - Z[1]) / (N - 1)

    for i=2:(N-1)
        Z[i] = Z[1] + zstep * (i - 1)
    end

    for a = 1 : N
        for b = 1 : N
          if b == 1
              Zprob[a,b] = 0.5*erfc(-((Z[1] - mu - rho * Z[a] + zstep / 2) / sigma)/sqrt(2))
          elseif b == N
              Zprob[a,b] = 1 - 0.5*erfc(-((Z[N] - mu - rho * Z[a] - zstep / 2) / sigma)/sqrt(2))
          else
              Zprob[a,b] = 0.5*erfc(-((Z[b] - mu - rho * Z[a] + zstep / 2) / sigma)/sqrt(2)) -
                           0.5*erfc(-((Z[b] - mu - rho * Z[a] - zstep / 2) / sigma)/sqrt(2))
          end
        end
    end
    # Collecting tauchen results in a 2 element array of linspace and array; [2] gets array
    # Zprob=collect(tauchen(gz, rho, sigma, mu, sigma_range))[2]
    Zcumprob=zeros(Float32,gz,gz)
    # 2 in cumsum! denotes the 2nd dimension, i.e. columns
    cumsum!(Zcumprob, Zprob,2)


    gm = gk * gp

    control=zeros(gm,2)
    for i=1:gk
        control[(1+gp*(i-1)):(gp*i),1]=fill(k[i],(gp,1))
        control[(1+gp*(i-1)):(gp*i),2]=p
    end
    endog=copy(control)

    E=Array(Float32,gm,gm,gz)
    for h=1:gm
       for m=1:gm
           for j=1:gz
             # set the nonzero net debt indicator
              if endog[h,2]<0
                 p_ind=1

              else
                 p_ind=0
              end

               # set the investment indicator
                if (control[m,1]-(1-delta)*endog[h,1])!=0
                   i_ind=1
                else
                   i_ind=0
                end

                E[m,h,j] = (1-tau)*z[j]*(endog[h,1]^theta) + control[m,2]-endog[h,2]*(1+r*(1-tau))  +
                    delta*endog[h,1]*tau-(control[m,1]-(1-delta)*endog[h,1]) -
                     (i_ind*gamma*endog[h,1]+endog[h,1]*(a_capital/2)*(((control[m,1]-(1-delta)*endog[h,1])/endog[h,1])^2)) +
                    s*endog[h,2]*p_ind
                elem = E[m,h,j]
                if E[m,h,j]<0
                    E[m,h,j]=elem+lambda1*elem-.5*lambda2*elem^2
                else
                    E[m,h,j]=elem
                end
            end
        end
     end

I then constructed the function with serial processing. The two for loops iterate through each element to find the largest value in a 1072-sized (=the gm scalar argument in the function) array:

function dynam_serial(E,gm,gz,beta,Zprob)
    v           = Array(Float32,gm,gz )
    fill!(v,E[cld(gm,2),cld(gm,2),cld(gz,2)])

    Tv          = Array(Float32,gm,gz)

    # Set parameters for the loop
    convcrit = 0.0001   # chosen convergence criterion
    diff = 1          # arbitrary initial value greater than convcrit

    while diff>convcrit
      exp_v=v*Zprob'

      for h=1:gm
        for j=1:gz
          Tv[h,j]=findmax(E[:,h,j] + beta*exp_v[:,j])[1]
        end
      end

      diff = maxabs(Tv - v)
      v=copy(Tv)
    end
end

Timing this, I get:

@time dynam_serial(E,gm,gz,beta,Zprob)

> 106.880008 seconds (91.70 M allocations: 203.233 GB, 15.22% gc time)

Now, I try using Shared Arrays to benefit from parallel processing. Note that I reconfigured the iteration so that I only have one for loop, rather than two. I also use v=deepcopy(Tv); otherwise, v is copied as an Array object, rather than a SharedArray:

function dynam_parallel(E,gm,gz,beta,Zprob)
    v           = SharedArray(Float32,(gm,gz),init = S -> S[Base.localindexes(S)] = myid() )
    fill!(v,E[cld(gm,2),cld(gm,2),cld(gz,2)])

    # Set parameters for the loop
    convcrit = 0.0001   # chosen convergence criterion
    diff = 1          # arbitrary initial value greater than convcrit

    while diff>convcrit
      exp_v=v*Zprob'
      Tv          = SharedArray(Float32,gm,gz,init = S -> S[Base.localindexes(S)] = myid() )

      @sync @parallel for hj=1:(gm*gz)
         j=cld(hj,gm)
         h=mod(hj,gm)
         if h==0;h=gm;end;

         @async Tv[h,j]=findmax(E[:,h,j] + beta*exp_v[:,j])[1]
      end

      diff = maxabs(Tv - v)
      v=deepcopy(Tv)
    end
end

Timing the parallel version; and using a 4-core 2.5 GHz I7 processor with 16GB of memory, I get:

addprocs(3)
@time dynam_parallel(E,gm,gz,beta,Zprob)

> 164.237208 seconds (2.64 M allocations: 201.812 MB, 0.04% gc time)

Am I doing something incorrect here? Or is there a better way to approach parallel processing in Julia for this particular problem? I've considered using Distributed Arrays, but it's difficult for me to see how to apply them to the present problem.

UPDATE: Per @DanGetz and his helpful comments, I turned instead to trying to speed up the serial processing version. I was able to get performance down to 53.469780 seconds (67.36 M allocations: 103.419 GiB, 19.12% gc time) through:

1) Upgrading to 0.6.0 (saved about 25 seconds), which includes the helpful @views macro.

2) Preallocating the main array I'm trying to fill in (Tv), per the section on Preallocating Outputs in the Julia Performance Tips: https://docs.julialang.org/en/latest/manual/performance-tips/. (saved another 25 or so seconds)

The biggest remaining slow-down seems to be coming from the add_vecs function, which sums together subarrays of two larger matrices. I've tried devectorizing and using BLAS functions, but haven't been able to produce better performance.

In any event, the improved code for dynam_serial is below:

function add_vecs(r::Array{Float32},h::Int,j::Int,E::Array{Float32},exp_v::Array{Float32},beta::Float32)

  @views r=E[:,h,j] + beta*exp_v[:,j]
  return r
end


function dynam_serial(E::Array{Float32},gm::Int,gz::Int,beta::Float32,Zprob::Array{Float32})
    v           = Array{Float32}(gm,gz)
    fill!(v,E[cld(gm,2),cld(gm,2),cld(gz,2)])
    Tv          = Array{Float32}(gm,gz)
    r           = Array{Float32}(gm)

    # Set parameters for the loop
    convcrit = 0.0001   # chosen convergence criterion
    diff = 1          # arbitrary initial value greater than convcrit

    while diff>convcrit
      exp_v=v*Zprob'

      for h=1:gm
        for j=1:gz
          @views Tv[h,j]=findmax(add_vecs(r,h,j,E,exp_v,beta))[1]
        end
      end

      diff = maximum(abs,Tv - v)
      v=copy(Tv)
    end
    return Tv
end

回答1:

If add_vecs seems to be the critical function, writing an explicit for loop could offer more optimization. How does the following benchmark:

function add_vecs!(r::Array{Float32},h::Int,j::Int,E::Array{Float32},
                  exp_v::Array{Float32},beta::Float32)
    @inbounds for i=1:size(E,1)
        r[i]=E[i,h,j] + beta*exp_v[i,j]
    end
    return r
end

UPDATE

To continue optimizing dynam_serial I have tried to remove more allocations. The result is:

function add_vecs_and_max!(gm::Int,r::Array{Float64},h::Int,j::Int,E::Array{Float64},
                           exp_v::Array{Float64},beta::Float64)
    @inbounds for i=1:gm            
        r[i] = E[i,h,j]+beta*exp_v[i,j]
    end
    return findmax(r)[1]
end

function dynam_serial(E::Array{Float64},gm::Int,gz::Int,
                      beta::Float64,Zprob::Array{Float64})
    v           = Array{Float64}(gm,gz)
    fill!(v,E[cld(gm,2),cld(gm,2),cld(gz,2)])
    r           = Array{Float64}(gm)
    exp_v       = Array{Float64}(gm,gz)

    # Set parameters for the loop
    convcrit = 0.0001   # chosen convergence criterion
    diff = 1.0 # arbitrary initial value greater than convcrit
    while diff>convcrit
        A_mul_Bt!(exp_v,v,Zprob)
        diff = -Inf
        for h=1:gm
            for j=1:gz
                oldv = v[h,j]
                newv = add_vecs_and_max!(gm,r,h,j,E,exp_v,beta)
                v[h,j]= newv
                diff = max(diff, oldv-newv, newv-oldv)
            end
        end
    end
    return v
end

Switching the functions to use Float64 should increase speed (as CPUs are inherently optimized for 64-bit word lengths). Also, using the mutating A_mul_Bt! directly saves another allocation. Avoiding the copy(...) by switching the arrays v and Tv.

How do these optimizations improve your running time?

2nd UPDATE

Updated the code in the UPDATE section to use findmax. Also, changed dynam_serial to use v without Tv, as there was no need to save the old version except for the diff calculation, which is now done inside the loop.



回答2:

Here's the code I copied-and-pasted, provided by Dan Getz above. I include the array and scalar definitions exactly as I ran them. Performance was: 39.507005 seconds (11 allocations: 486.891 KiB) when running @time dynam_serial(E,gm,gz,beta,Zprob).

using SpecialFunctions
est_params = [.788,.288,.0034,.1519,.1615,.0041,.0077,.2,0.005,.7196]

r          = 0.015
tau        = 0.35
rho        =est_params[1]
sigma      =est_params[2]
delta      = 0.15
gamma      =est_params[3]
a_capital  =est_params[4]
lambda1    =est_params[5]
lambda2    =est_params[6]
s          =est_params[7]
theta      =est_params[8]
mu         =est_params[9]
p_bar_k_ss  =est_params[10]
beta  = (1+r)^(-1)
sigma_range = 4
gz = 19 #15 #19
gp = 29 #19 #29
gk = 37 #25 #37

lnz=collect(linspace(-sigma_range*sigma,sigma_range*sigma,gz))
z=exp.(lnz)

gk_m = fld(gk,2)
# Need to add mu somewhere to k_ss
k_ss =  (theta*(1-tau)/(r+delta))^(1/(1-theta))
k=cat(1,map(i->k_ss*((1-delta)^i),collect(1:gk_m)),map(i->k_ss/((1-delta)^i),collect(1:gk_m)))
insert!(k,gk_m+1,k_ss)
sort!(k)
p_bar=p_bar_k_ss*k_ss
p = collect(linspace(-p_bar/2,p_bar,gp))

#Tauchen
N     = length(z)
Z     = zeros(N,1)
Zprob = zeros(Float64,N,N)

Z[N]  = lnz[length(z)]
Z[1]  = lnz[1]

zstep = (Z[N] - Z[1]) / (N - 1)

for i=2:(N-1)
    Z[i] = Z[1] + zstep * (i - 1)
end

for a = 1 : N
    for b = 1 : N
      if b == 1
          Zprob[a,b] = 0.5*erfc(-((Z[1] - mu - rho * Z[a] + zstep / 2) / sigma)/sqrt(2))
      elseif b == N
          Zprob[a,b] = 1 - 0.5*erfc(-((Z[N] - mu - rho * Z[a] - zstep / 2) / sigma)/sqrt(2))
      else
          Zprob[a,b] = 0.5*erfc(-((Z[b] - mu - rho * Z[a] + zstep / 2) / sigma)/sqrt(2)) -
                       0.5*erfc(-((Z[b] - mu - rho * Z[a] - zstep / 2) / sigma)/sqrt(2))
      end
    end
end
# Collecting tauchen results in a 2 element array of linspace and array; [2] gets array
# Zprob=collect(tauchen(gz, rho, sigma, mu, sigma_range))[2]
Zcumprob=zeros(Float64,gz,gz)
# 2 in cumsum! denotes the 2nd dimension, i.e. columns
cumsum!(Zcumprob, Zprob,2)


gm = gk * gp

control=zeros(gm,2)
for i=1:gk
    control[(1+gp*(i-1)):(gp*i),1]=fill(k[i],(gp,1))
    control[(1+gp*(i-1)):(gp*i),2]=p
end
endog=copy(control)

E=Array(Float64,gm,gm,gz)
for h=1:gm
   for m=1:gm
       for j=1:gz
         # set the nonzero net debt indicator
          if endog[h,2]<0
             p_ind=1

          else
             p_ind=0
          end

           # set the investment indicator
            if (control[m,1]-(1-delta)*endog[h,1])!=0
               i_ind=1
            else
               i_ind=0
            end

            E[m,h,j] = (1-tau)*z[j]*(endog[h,1]^theta) + control[m,2]-endog[h,2]*(1+r*(1-tau))  +
                delta*endog[h,1]*tau-(control[m,1]-(1-delta)*endog[h,1]) -
                 (i_ind*gamma*endog[h,1]+endog[h,1]*(a_capital/2)*(((control[m,1]-(1-delta)*endog[h,1])/endog[h,1])^2)) +
                s*endog[h,2]*p_ind
            elem = E[m,h,j]
            if E[m,h,j]<0
                E[m,h,j]=elem+lambda1*elem-.5*lambda2*elem^2
            else
                E[m,h,j]=elem
            end
        end
    end
 end

function add_vecs_and_max!(gm::Int,r::Array{Float64},h::Int,j::Int,E::Array{Float64},
                           exp_v::Array{Float64},beta::Float64)
    maxr = -Inf
    @inbounds for i=1:gm            r[i] = E[i,h,j]+beta*exp_v[i,j]
        maxr = max(r[i],maxr)
    end
    return maxr
end

function dynam_serial(E::Array{Float64},gm::Int,gz::Int,
                      beta::Float64,Zprob::Array{Float64})
    v           = Array{Float64}(gm,gz)
    fill!(v,E[cld(gm,2),cld(gm,2),cld(gz,2)])
    Tv          = Array{Float64}(gm,gz)
    r           = Array{Float64}(gm)
    exp_v       = Array{Float64}(gm,gz)

    # Set parameters for the loop
    convcrit = 0.0001   # chosen convergence criterion
    diff = 1.0 # arbitrary initial value greater than convcrit
    while diff>convcrit
        A_mul_Bt!(exp_v,v,Zprob)
        diff = -Inf
        for h=1:gm
            for j=1:gz
                Tv[h,j]=add_vecs_and_max!(gm,r,h,j,E,exp_v,beta)
                diff = max(abs(Tv[h,j]-v[h,j]),diff)
            end
        end
        (v,Tv)=(Tv,v)
    end
    return v
end

Now, here's another version of the algorithm and inputs. The functions are similar to what Dan Getz suggested, except that I use findmax rather than an iterated max function to find the array maximum. In the input construction, I am using both Float32 and mixing different bit-types together. However, I've consistently achieved better performance this way: 24.905569 seconds (1.81 k allocations: 46.829 MiB, 0.01% gc time). But it's not clear at all why.

using SpecialFunctions
est_params = [.788,.288,.0034,.1519,.1615,.0041,.0077,.2,0.005,.7196]

r          = 0.015
tau        = 0.35
rho        =est_params[1]
sigma      =est_params[2]
delta      = 0.15
gamma      =est_params[3]
a_capital  =est_params[4]
lambda1    =est_params[5]
lambda2    =est_params[6]
s          =est_params[7]
theta      =est_params[8]
mu         =est_params[9]
p_bar_k_ss  =est_params[10]
beta  = Float32((1+r)^(-1))
sigma_range = 4
gz = 19
gp = 29
gk = 37

lnz=collect(linspace(-sigma_range*sigma,sigma_range*sigma,gz))
z=exp(lnz)

gk_m = fld(gk,2)
# Need to add mu somewhere to k_ss
k_ss =  (theta*(1-tau)/(r+delta))^(1/(1-theta))
k=cat(1,map(i->k_ss*((1-delta)^i),collect(1:gk_m)),map(i->k_ss/((1-delta)^i),collect(1:gk_m)))
insert!(k,gk_m+1,k_ss)
sort!(k)
p_bar=p_bar_k_ss*k_ss
p = collect(linspace(-p_bar/2,p_bar,gp))

#Tauchen
N     = length(z)
Z     = zeros(N,1)
Zprob = zeros(Float32,N,N)

Z[N]  = lnz[length(z)]
Z[1]  = lnz[1]

zstep = (Z[N] - Z[1]) / (N - 1)

for i=2:(N-1)
    Z[i] = Z[1] + zstep * (i - 1)
end

for a = 1 : N
    for b = 1 : N
      if b == 1
          Zprob[a,b] = 0.5*erfc(-((Z[1] - mu - rho * Z[a] + zstep / 2) / sigma)/sqrt(2))
      elseif b == N
          Zprob[a,b] = 1 - 0.5*erfc(-((Z[N] - mu - rho * Z[a] - zstep / 2) / sigma)/sqrt(2))
      else
          Zprob[a,b] = 0.5*erfc(-((Z[b] - mu - rho * Z[a] + zstep / 2) / sigma)/sqrt(2)) -
                       0.5*erfc(-((Z[b] - mu - rho * Z[a] - zstep / 2) / sigma)/sqrt(2))
      end
    end
end
# Collecting tauchen results in a 2 element array of linspace and array; [2] gets array
# Zprob=collect(tauchen(gz, rho, sigma, mu, sigma_range))[2]
Zcumprob=zeros(Float32,gz,gz)
# 2 in cumsum! denotes the 2nd dimension, i.e. columns
cumsum!(Zcumprob, Zprob,2)


gm = gk * gp

control=zeros(gm,2)
for i=1:gk
    control[(1+gp*(i-1)):(gp*i),1]=fill(k[i],(gp,1))
    control[(1+gp*(i-1)):(gp*i),2]=p
end
endog=copy(control)

E=Array(Float32,gm,gm,gz)
for h=1:gm
   for m=1:gm
       for j=1:gz
         # set the nonzero net debt indicator
          if endog[h,2]<0
             p_ind=1

          else
             p_ind=0
          end

           # set the investment indicator
            if (control[m,1]-(1-delta)*endog[h,1])!=0
               i_ind=1
            else
               i_ind=0
            end

            E[m,h,j] = (1-tau)*z[j]*(endog[h,1]^theta) + control[m,2]-endog[h,2]*(1+r*(1-tau))  +
                delta*endog[h,1]*tau-(control[m,1]-(1-delta)*endog[h,1]) -
                 (i_ind*gamma*endog[h,1]+endog[h,1]*(a_capital/2)*(((control[m,1]-(1-delta)*endog[h,1])/endog[h,1])^2)) +
                s*endog[h,2]*p_ind
            elem = E[m,h,j]
            if E[m,h,j]<0
                E[m,h,j]=elem+lambda1*elem-.5*lambda2*elem^2
            else
                E[m,h,j]=elem
            end
        end
    end
 end

 function add_vecs!(gm::Int,r::Array{Float32},h::Int,j::Int,E::Array{Float32},
                   exp_v::Array{Float32},beta::Float32)

     @inbounds @views for i=1:gm
         r[i]=E[i,h,j] + beta*exp_v[i,j]
     end
     return r
 end

 function dynam_serial(E::Array{Float32},gm::Int,gz::Int,beta::Float32,Zprob::Array{Float32})
     v           = Array{Float32}(gm,gz)
     fill!(v,E[cld(gm,2),cld(gm,2),cld(gz,2)])

     Tv          = Array{Float32}(gm,gz)

     # Set parameters for the loop
     convcrit = 0.0001   # chosen convergence criterion
     diff = 1.00000          # arbitrary initial value greater than convcrit
     iter=0
     exp_v=Array{Float32}(gm,gz)

     r=Array{Float32}(gm)

     while diff>convcrit


       A_mul_Bt!(exp_v,v,Zprob)

       for h=1:gm
         for j=1:gz
           Tv[h,j]=findmax(add_vecs!(gm,r,h,j,E,exp_v,beta))[1]
         end
       end
       diff = maximum(abs,Tv - v)
       (v,Tv)=(Tv,v)
     end
     return v
 end