I need to perform some operations on a tensor and I would like make this parallel.
Consider the following example:
# first part without doParallel
N = 8192
M = 128
F = 64
ma <- function(x,n=5){filter(x,rep(1/n,n), sides=2)}
m <- array(rexp(N*M*F), dim=c(N,M,F))
new_m <- array(0, dim=c(N,M,F))
system.time ( for(i in 1:N) {
for(j in 1:F) {
ma_r <- ma(m[i,,j],2)
ma_r <- c(ma_r[-length(ma_r)], ma_r[(length(ma_r)-1)])
new_m[i,,j] <- ma_r
}
}
)
This takes around 38 seconds in my laptop.
The following is with doParallel:
# second part with doParallel
library(doParallel)
no_cores <- detectCores() - 1
cl <- makeCluster(no_cores, type="FORK")
registerDoParallel(cl)
calcMat <- function(x){
n <- dim(x)[1]
m <- dim(x)[2]
new_x <- matrix(0, nrow=n, ncol=m)
for(j in 1:ncol(x)) {
ma_r <- ma(x[,j],2)
ma_r <- c(ma_r[-length(ma_r)], ma_r[(length(ma_r)-1)])
new_x[,j] <- ma_r
}
return(new_x)
}
system.time ( a_list <- foreach(i=1:N) %dopar% {
m_m <- m[i,,]
new_m_m <- calcMat(m_m)
}
)
Y <- array(unlist(a_list), dim = c(nrow(a_list[[1]]), ncol(a_list[[1]]), length(a_list)))
Y <- aperm(Y, c(3,1,2))
stopCluster(cl)
This second one takes around 36 seconds.
So I do not see any improvement in terms of time.
Does anyone know what is the reason for that?
You need to be aware of certain things when you want to use parallelization.
The first one is that there is an overhead due to communication and possibly serialization.
As a very crude example,
consider the following:
num_cores <- 2L
cl <- makeCluster(num_cores, type="FORK")
registerDoParallel(cl)
exec_time <- system.time({
a_list <- foreach(i=1L:2L) %dopar% {
system.time({
m_m <- m[i,,]
new_m_m <- calcMat(m_m)
})
}
})
In my system,
exec_time
shows an elapsed time of 1.264 seconds,
whereas the elapsed times in a_list
each show 0.003 seconds.
So in a very simplified way we could say that 99.7% of the execution time was overhead.
This has to do with task granularity.
Different types of tasks benefit from different types of granularity.
In your case,
you can benefit from chunking your tasks in a coarse way.
This basically means that you group the number of tasks in a way that reduces communication overhead:
chunks <- splitIndices(N, num_cores)
str(chunks)
List of 2
$ : int [1:4096] 1 2 3 4 5 6 7 8 9 10 ...
$ : int [1:4096] 4097 4098 4099 4100 4101 4102 4103 4104 4105 4106 ...
Each chunk has indices for several tasks,
so you need to modify your code appropriately:
exec_time_chunking <- system.time({
a_list <- foreach(chunk=chunks, .combine=c) %dopar% {
lapply(chunk, function(i) {
m_m <- m[i,,]
calcMat(m_m)
})
}
})
The above completed in 17.978 seconds in my system,
using 2 parallel workers.
EDIT: as a side note,
I think there's usually no good reason to set the number of parallel workers to detectCores() - 1L
,
since the main R process has to wait for all parallel workers to finish,
but maybe you have other reasons,
perhaps maintaining system responsiveness.
Just noticed that your code works if you set cluster type to "SOCK"
cl <- makeCluster(numberofcores, type = "SOCK")
Note: On Windows this does not work, I used the doSNOW package (have found that it has better compatibility on multiple OS)
The following runs much faster
library(parallel)
library(doSNOW)
numberofcores = detectCores() # review what number of cores does for your environment
cl <- makeCluster(numberofcores, type = "SOCK")
# Register cluster so that caret will know to train in parallel.
registerDoSNOW(cl)
system.time ( foreach(i = 1:N) %dopar% {
for(j in 1:F) {
ma_r <- ma(m[i,,j],2)
ma_r <- c(ma_r[-length(ma_r)], ma_r[(length(ma_r)-1)])
new_m[i,,j] <- ma_r
}
}
)
stopCluster(cl)