Here is what I am trying to do using the foreach package.
I have data set with 600 rows and 58000 column with lots of missing values.
We need to impute the missing values using package called "missForest" in which it is not parallel, it takes to much time to run this data at once.
so, I am thinking to divide the data into 7 data sets (I have 7 cores) with the same number of rows (my lines) and different number of col ( markers).
Then using %dopar%
to pass the data sets in parallel to missForest?
I do not see how to divide the data into smaller data sets and pass those data sets to missForest then recombine the outputs!
I will appreciate it so much if you can show me how?
Here is a small example, form BLR package, demonstrating my problem:
library(BLR)
library(missForest)
data(wheat)
X2<- prodNA(X, 0.1)
dim(X2) ## i need to divide X2 to several 7 data frames (ii)
X3<- missForest(X2)
X3$Ximp ## combine ii data frames
When processing a large matrix in parallel, it can be very important to only pass as much data as is needed for each of the cluster workers. This isn't an issue when using mclapply
, either directly or indirectly when using doParallel
on Linux. But on Windows, input data is sent to the cluster workers via socket connections, so it can be very important.
For cases like this, I use the isplitCol
function from the itertools
package. It creates an iterator over blocks of columns of a matrix. Using the chunks
argument, you can split the matrix so that each cluster worker gets exactly one submatrix.
Here's a translation of your example into foreach
which uses isplitCol
to split the input matrix into 7 submatrices, thus decreasing the data sent to each worker by a factor of seven compared to auto-exporting X2
to each worker:
library(doParallel)
library(itertools)
library(BLR)
library(missForest)
ncores <- 7
cl <- makePSOCKcluster(ncores)
registerDoParallel(cl)
data(wheat)
X2 <- prodNA(X, 0.1)
X3 <- foreach(m=isplitCols(X2, chunks=ncores), .combine='cbind',
.packages='missForest') %dopar% {
missForest(m)$ximp
}
print(X3)
stopCluster(cl)
library(multicore)
n.cores <- 7
cuts <- cut(1:ncol(X2), n.cores)
X3 <- mclapply(levels(cuts), function(x) missForest(X2[,cuts == x])$ximp , mc.cores = n.cores)
X3 <- do.call(cbind, X3)
cut to split the columns into 7 intervals then mclapply to send to your 7 cores. cbind them all together at the end
edit: adding my foreach implementation. note: I have never used this package before but it seems to be doing what I would expect
library(doParallel)
library(foreach)
n.cores <- 7
cuts <- cut(1:ncol(X2), n.cores)
cl <- makeCluster(n.cores)
registerDoParallel(cl)
X3 <- foreach(x = levels(cuts), .combine = cbind, .multicombine = TRUE) %dopar% { library(missForest); missForest(X2[, cuts == x])$ximp }
You have to split your matrix in ncores
parts and then combine them again. Since you're using Random Forest, you can split the data at random (and do it several times to check and validate the results).
ncores = 7
split = sample(seq(ncores), size=ncol(X2), replace=TRUE) # random partitioning
X3 = foreach(i=seq_len(ncores), .combine=cbind, .inorder=FALSE) %dopar% {
ind = which(split==i) # the selected rows for this core
X = rbind(ind, missForest(X2[,ind])$ximp) # add the index as first row!
}
ind = X3[1,] # get all the index back
ind = sort(ind, index.return=TRUE)$ix # sort the index to recover the original row order
X3 = X3[-1,ind] # remove the index