I am currently trying parallel computing in R.
I am trying to train a logistic ridge model , and I currently have 4 Cores on my computer. I would like to split my data set equally into 4 pieces, and use each core to train model (on the training data) and save the result of each core into a single vector . the problem is that i have no clue how to do it, right now I tried to parallel with the foreach package, but the problem is the each core sees the same training data. here is the code with the foreach package (which doesn't split the data) :
library(ridge)
library(parallel)
library(foreach)
num_of_cores <- detectCores()
mydata <- read.csv("http://www.ats.ucla.edu/stat/data/binary.csv")
data_per_core <- floor(nrow(mydata)/num_of_cores)
result <- data.frame()
r <- foreach(icount(4), .combine = cbind) %dopar% {
result <- logisticRidge(admit~ gre + gpa + rank,data = mydata)
coefficients(result)
}
any idea how to simultaneously split the data into x chunks and train the models in parallel ?
How about something like this? It uses snowfall
instead of the foreach
-library, but should give the same results.
library(snowfall)
library(ridge)
# for reproducability
set.seed(123)
num_of_cores <- parallel::detectCores()
mydata <- read.csv("http://www.ats.ucla.edu/stat/data/binary.csv")
data_per_core <- floor(nrow(mydata)/num_of_cores)
# we take random rows to each cluster, by sampleid
mydata$sampleid <- sample(1:num_of_cores, nrow(mydata), replace = T)
# create a small function that calculates the coefficients
regfun <- function(dat) {
library(ridge) # this has to be in the function, otherwise snowfall doesnt know the logistic ridge function
result <- logisticRidge(admit~ gre + gpa + rank, data = dat)
coefs <- as.numeric(coefficients(result))
return(coefs)
}
# prepare the data
datlist <- lapply(1:num_of_cores, function(i){
dat <- mydata[mydata$sampleid == i, ]
})
# initiate the clusters
sfInit(parallel = T, cpus = num_of_cores)
# export the function and the data to the cluster
sfExport("regfun")
# calculate, (sfClusterApply is very similar to sapply)
res <- sfClusterApply(datlist, function(datlist.element) {
regfun(dat = datlist.element)
})
#stop the cluster
sfStop()
# convert the list to a data.frame. data.table::rbindlist(list(res)) does the same job
res <- data.frame(t(matrix(unlist(res), ncol = num_of_cores)))
names(res) <- c("intercept", "gre", "gpa", "rank")
res
# res
# intercept gre
# 1 -3.002592 1.558363e-03
# 2 -4.142939 1.060692e-03
# 3 -2.967130 2.315487e-03
# 4 -1.176943 4.786894e-05
# gpa rank
# 1 0.7048146997 -0.382462408
# 2 0.9978841880 -0.314589628
# 3 0.6797382218 -0.464219036
# 4 -0.0004576679 -0.007618317
The itertools
package provides a number of functions for iterating over various data structures with foreach loops. In this case, you could use the isplitRows
function to split the data frame row-wise into one chunk per worker:
library(ridge)
library(doParallel)
library(itertools)
num_of_cores <- detectCores()
cl <- makePSOCKcluster(num_of_cores)
registerDoParallel(cl)
mydata <- read.csv("http://www.ats.ucla.edu/stat/data/binary.csv")
r <- foreach(d=isplitRows(mydata, chunks=num_of_cores),
.combine = cbind, .packages="ridge") %dopar% {
result <- logisticRidge(admit~ gre + gpa + rank, data = d)
coefficients(result)
}
isplitRows
also takes a chunkSize
argument if you want to control the maximum size of each chunk.
Note that using this technique, each worker only receives an appropriate fraction of mydata
. This is particularly important for larger data frames with a PSOCK
cluster.