Split data set and pass the subsets in parallel to

2019-04-11 15:37发布

问题:

Here is what I am trying to do using the foreach package. I have data set with 600 rows and 58000 column with lots of missing values.

We need to impute the missing values using package called "missForest" in which it is not parallel, it takes to much time to run this data at once. so, I am thinking to divide the data into 7 data sets (I have 7 cores) with the same number of rows (my lines) and different number of col ( markers). Then using %dopar% to pass the data sets in parallel to missForest?

I do not see how to divide the data into smaller data sets and pass those data sets to missForest then recombine the outputs!

I will appreciate it so much if you can show me how?

Here is a small example, form BLR package, demonstrating my problem:

   library(BLR)
   library(missForest)
   data(wheat)
   X2<- prodNA(X, 0.1)
   dim(X2)                 ## i need to divide X2 to several 7 data frames (ii)

  X3<- missForest(X2)

  X3$Ximp  ## combine ii data frames

回答1:

When processing a large matrix in parallel, it can be very important to only pass as much data as is needed for each of the cluster workers. This isn't an issue when using mclapply, either directly or indirectly when using doParallel on Linux. But on Windows, input data is sent to the cluster workers via socket connections, so it can be very important.

For cases like this, I use the isplitCol function from the itertools package. It creates an iterator over blocks of columns of a matrix. Using the chunks argument, you can split the matrix so that each cluster worker gets exactly one submatrix.

Here's a translation of your example into foreach which uses isplitCol to split the input matrix into 7 submatrices, thus decreasing the data sent to each worker by a factor of seven compared to auto-exporting X2 to each worker:

library(doParallel)
library(itertools)
library(BLR)
library(missForest)
ncores <- 7
cl <- makePSOCKcluster(ncores)
registerDoParallel(cl)
data(wheat)
X2 <- prodNA(X, 0.1)
X3 <- foreach(m=isplitCols(X2, chunks=ncores), .combine='cbind',
              .packages='missForest') %dopar% {
  missForest(m)$ximp
}
print(X3)
stopCluster(cl)


回答2:

library(multicore)

n.cores <- 7
cuts <- cut(1:ncol(X2), n.cores)
X3 <- mclapply(levels(cuts), function(x) missForest(X2[,cuts == x])$ximp , mc.cores = n.cores)
X3 <- do.call(cbind, X3)

cut to split the columns into 7 intervals then mclapply to send to your 7 cores. cbind them all together at the end

edit: adding my foreach implementation. note: I have never used this package before but it seems to be doing what I would expect

library(doParallel)
library(foreach)
n.cores <- 7
cuts <- cut(1:ncol(X2), n.cores)

cl <- makeCluster(n.cores)
registerDoParallel(cl)

X3 <- foreach(x = levels(cuts), .combine = cbind, .multicombine = TRUE) %dopar% { library(missForest); missForest(X2[, cuts == x])$ximp }


回答3:

You have to split your matrix in ncores parts and then combine them again. Since you're using Random Forest, you can split the data at random (and do it several times to check and validate the results).

ncores = 7

split = sample(seq(ncores), size=ncol(X2), replace=TRUE) # random partitioning

X3 = foreach(i=seq_len(ncores), .combine=cbind, .inorder=FALSE) %dopar% {
  ind = which(split==i) # the selected rows for this core
  X = rbind(ind, missForest(X2[,ind])$ximp) # add the index as first row!
}

ind = X3[1,] # get all the index back
ind = sort(ind, index.return=TRUE)$ix # sort the index to recover the original row order
X3 = X3[-1,ind] # remove the index