Parallelising gradient calculation in Julia

2019-04-07 12:51发布

I was persuaded some time ago to drop my comfortable matlab programming and start programming in Julia. I have been working for a long with neural networks and I thought that, now with Julia, I could get things done faster by parallelising the calculation of the gradient.

The gradient need not be calculated on the entire dataset in one go; instead one can split the calculation. For instance, by splitting the dataset in parts, we can calculate a partial gradient on each part. The total gradient is then calculated by adding up the partial gradients.

Though, the principle is simple, when I parallelise with Julia I get a performance degradation, i.e. one process is faster then two processes! I am obviously doing something wrong... I have consulted other questions asked in the forum but I could still not piece together an answer. I think my problem lies in that there is a lot of unnecessary data moving going on, but I can't fix it properly.

In order to avoid posting messy neural network code, I am posting below a simpler example that replicates my problem in the setting of linear regression.

The code-block below creates some data for a linear regression problem. The code explains the constants, but X is the matrix containing the data inputs. We randomly create a weight vector w which when multiplied with X creates some targets Y.

######################################
## CREATE LINEAR REGRESSION PROBLEM ##
######################################

# This code implements a simple linear regression problem

MAXITER = 100   # number of iterations for simple gradient descent
N = 10000       # number of data items
D = 50          # dimension of data items
X = randn(N, D) # create random matrix of data, data items appear row-wise
Wtrue = randn(D,1) # create arbitrary weight matrix to generate targets
Y = X*Wtrue     # generate targets

The next code-block below defines functions for measuring the fitness of our regression (i.e. the negative log-likelihood) and the gradient of the weight vector w:

####################################
##       DEFINE FUNCTIONS         ##
####################################

@everywhere  begin

  #-------------------------------------------------------------------
  function negative_loglikelihood(Y,X,W)
  #-------------------------------------------------------------------

    # number of data items
    N  = size(X,1)
    # accumulate here log-likelihood
    ll = 0
    for nn=1:N
      ll = ll - 0.5*sum((Y[nn,:] - X[nn,:]*W).^2)
    end

    return ll
  end


  #-------------------------------------------------------------------
  function negative_loglikelihood_grad(Y,X,W, first_index,last_index)
  #-------------------------------------------------------------------

    # number of data items
    N  = size(X,1)
    # accumulate here gradient contributions by each data item
    grad = zeros(similar(W))
    for nn=first_index:last_index
      grad = grad +  X[nn,:]' * (Y[nn,:] - X[nn,:]*W)
    end

    return grad
  end


end

Note that the above functions are on purpose not vectorised! I choose not to vectorise, as the final code (the neural network case) will also not admit any vectorisation (let us not get into more details regarding this).

Finally, the code-block below shows a very simple gradient descent that tries to recover the parameter weight vector w from the given data Y and X:

####################################
##     SOLVE LINEAR REGRESSION    ##
####################################


# start from random initial solution
W = randn(D,1)

# learning rate, set here to some arbitrary small constant
eta = 0.000001

# the following for-loop implements simple gradient descent
for iter=1:MAXITER

  # get gradient
  ref_array = Array(RemoteRef, nworkers())

  # let each worker process part of matrix X
  for index=1:length(workers())

    # first index of subset of X that worker should work on
    first_index       = (index-1)*int(ceil(N/nworkers())) + 1
    # last index of subset of X that worker should work on
    last_index        = min((index)*(int(ceil(N/nworkers()))), N)

    ref_array[index] = @spawn negative_loglikelihood_grad(Y,X,W, first_index,last_index)
  end

  # gather the gradients calculated on parts of matrix X
  grad = zeros(similar(W))
  for index=1:length(workers())
    grad = grad + fetch(ref_array[index])
  end

  # now that we have the gradient we can update parameters W
  W = W + eta*grad;

  # report progress, monitor optimisation
  @printf("Iter %d neg_loglikel=%.4f\n",iter, negative_loglikelihood(Y,X,W))
end

As is hopefully visible, I tried to parallelise the calculation of the gradient in the easiest possible way here. My strategy is to break the calculation of the gradient in as many parts as available workers. Each worker is required to work only on part of matrix X, which part is specified by first_index and last_index. Hence, each worker should work with X[first_index:last_index,:]. For instance, for 4 workers and N = 10000, the work should be divided as follows:

  • worker 1 => first_index = 1, last_index = 2500
  • worker 2 => first_index = 2501, last_index = 5000
  • worker 3 => first_index = 5001, last_index = 7500
  • worker 4 => first_index = 7501, last_index = 10000

Unfortunately, this entire code works faster if I have only one worker. If add more workers via addprocs(), the code runs slower. One can aggravate this issue by create more data items, for instance use instead N=20000. With more data items, the degradation is even more pronounced. In my particular computing environment with N=20000 and one core, the code runs in ~9 secs. With N=20000 and 4 cores it takes ~18 secs!

I tried many many different things inspired by the questions and answers in this forum but unfortunately to no avail. I realise that the parallelisation is naive and that data movement must be the problem, but I have no idea how to do it properly. It seems that the documentation is also a bit scarce on this issue (as is the nice book by Ivo Balbaert).

I would appreciate your help as I have been stuck for quite some while with this and I really need it for my work. For anyone wanting to run the code, to save you the trouble of copying-pasting you can get the code here.

Thanks for taking the time to read this very lengthy question! Help me turn this into a model answer that anyone new in Julia can then consult!

2条回答
何必那么认真
2楼-- · 2019-04-07 13:41

I would say that GD is not a good candidate for parallelizing it using any of the proposed methods: either SharedArray or DistributedArray, or own implementation of distribution of chunks of data.

The problem does not lay in Julia, but in the GD algorithm. Consider the code:

Main process:

for iter = 1:iterations #iterations: "the more the better"
    δ = _gradient_descent_shared(X, y, θ) 
    θ = θ -  α * (δ/N)   
end

The problem is in the above for-loop which is a must. No matter how good _gradient_descent_shared is, the total number of iterations kills the noble concept of the parallelization.

After reading the question and the above suggestion I've started implementing GD using SharedArray. Please note, I'm not an expert in the field of SharedArrays.

The main process parts (simple implementation without regularization):

run_gradient_descent(X::SharedArray, y::SharedArray, θ::SharedArray, α, iterations) = begin
  N = length(y)

  for iter = 1:iterations 
    δ = _gradient_descent_shared(X, y, θ) 
    θ = θ -  α * (δ/N)   
  end     
  θ
end

_gradient_descent_shared(X::SharedArray, y::SharedArray, θ::SharedArray, op=(+)) = begin            
    if size(X,1) <= length(procs(X))
        return _gradient_descent_serial(X, y, θ)
    else
        rrefs = map(p -> (@spawnat p _gradient_descent_serial(X, y, θ)), procs(X))
        return mapreduce(r -> fetch(r), op, rrefs)
    end 
end

The code common to all workers:

#= Returns the range of indices of a chunk for every worker on which it can work. 
The function splits data examples (N rows into chunks), 
not the parts of the particular example (features dimensionality remains intact).=# 
@everywhere function _worker_range(S::SharedArray)
    idx = indexpids(S)
    if idx == 0
        return 1:size(S,1), 1:size(S,2)
    end
    nchunks = length(procs(S))
    splits = [round(Int, s) for s in linspace(0,size(S,1),nchunks+1)]
    splits[idx]+1:splits[idx+1], 1:size(S,2)
end

#Computations on the chunk of the all data.
@everywhere _gradient_descent_serial(X::SharedArray, y::SharedArray, θ::SharedArray)  = begin
    prange = _worker_range(X)

    pX = sdata(X[prange[1], prange[2]])
    py = sdata(y[prange[1],:])

    tempδ = pX' * (pX * sdata(θ) .- py)         
end

The data loading and training. Let me assume that we have:

  • features in X::Array of the size (N,D), where N - number of examples, D-dimensionality of the features
  • labels in y::Array of the size (N,1)

The main code might look like this:

X=[ones(size(X,1)) X] #adding the artificial coordinate 
N, D = size(X)
MAXITER = 500
α = 0.01

initialθ = SharedArray(Float64, (D,1))
sX = convert(SharedArray, X)
sy = convert(SharedArray, y)  
X = nothing
y = nothing
gc() 

finalθ = run_gradient_descent(sX, sy, initialθ, α, MAXITER);

After implementing this and run (on 8-cores of my Intell Clore i7) I got a very slight acceleration over serial GD (1-core) on my training multiclass (19 classes) training data (715 sec for serial GD / 665 sec for shared GD).

If my implementation is correct (please check this out - I'm counting on that) then parallelization of the GD algorithm is not worth of that. Definitely you might get better acceleration using stochastic GD on 1-core.

查看更多
老娘就宠你
3楼-- · 2019-04-07 13:45

If you want to reduce the amount of data movement, you should strongly consider using SharedArrays. You could preallocate just one output vector, and pass it as an argument to each worker. Each worker sets a chunk of it, just as you suggested.

查看更多
登录 后发表回答