Problem Description:
I have a big matrix c
, loaded in RAM memory. My goal is through parallel processing to have read only access to it. However when I create the connections either I use doSNOW
, doMPI
, big.matrix
, etc the amount to ram used increases dramatically.
Is there a way to properly create a shared memory, where all the processes may read from, without creating a local copy of all the data?
Example:
libs<-function(libraries){# Installs missing libraries and then load them
for (lib in libraries){
if( !is.element(lib, .packages(all.available = TRUE)) ) {
install.packages(lib)
}
library(lib,character.only = TRUE)
}
}
libra<-list("foreach","parallel","doSNOW","bigmemory")
libs(libra)
#create a matrix of size 1GB aproximatelly
c<-matrix(runif(10000^2),10000,10000)
#convert it to bigmatrix
x<-as.big.matrix(c)
# get a description of the matrix
mdesc <- describe(x)
# Create the required connections
cl <- makeCluster(detectCores ())
registerDoSNOW(cl)
out<-foreach(linID = 1:10, .combine=c) %dopar% {
#load bigmemory
require(bigmemory)
# attach the matrix via shared memory??
m <- attach.big.matrix(mdesc)
#dummy expression to test data aquisition
c<-m[1,1]
}
closeAllConnections()
RAM:
in the image above, you may find that the memory increases a lot until foreach
ends and it is freed.
I think the solution to the problem can be seen from the post of Steve Weston, the author of the foreach
package, here. There he states:
The doParallel package will auto-export variables to the workers that are referenced in the foreach loop.
So I think the problem is that in your code your big matrix c
is referenced in the assignment c<-m[1,1]
. Just try xyz <- m[1,1]
instead and see what happens.
Here is an example with a file-backed big.matrix
:
#create a matrix of size 1GB aproximatelly
n <- 10000
m <- 10000
c <- matrix(runif(n*m),n,m)
#convert it to bigmatrix
x <- as.big.matrix(x = c, type = "double",
separated = FALSE,
backingfile = "example.bin",
descriptorfile = "example.desc")
# get a description of the matrix
mdesc <- describe(x)
# Create the required connections
cl <- makeCluster(detectCores ())
registerDoSNOW(cl)
## 1) No referencing
out <- foreach(linID = 1:4, .combine=c) %dopar% {
t <- attach.big.matrix("example.desc")
for (i in seq_len(30L)) {
for (j in seq_len(m)) {
y <- t[i,j]
}
}
return(0L)
}
## 2) Referencing
out <- foreach(linID = 1:4, .combine=c) %dopar% {
invisible(c) ## c is referenced and thus exported to workers
t <- attach.big.matrix("example.desc")
for (i in seq_len(30L)) {
for (j in seq_len(m)) {
y <- t[i,j]
}
}
return(0L)
}
closeAllConnections()
Alternatively, if you are on Linux/Mac and you want a CoW shared memory, use forks. First load all your data into the main thread, and then launch working threads (forks) with general function mcparallel
from the parallel
package.
You can collect their results with mccollect
or with the use of truly shared memory using the Rdsm
library, like this:
library(parallel)
library(bigmemory) #for shared variables
shared<-bigmemory::big.matrix(nrow = size, ncol = 1, type = 'double')
shared[1]<-1 #Init shared memory with some number
job<-mcparallel({shared[1]<-23}) #...change it in another forked thread
shared[1,1] #...and confirm that it gets changed
# [1] 23
You can confirm, that the value really gets updated in backgruound, if you delay the write:
fn<-function()
{
Sys.sleep(1) #One second delay
shared[1]<-11
}
job<-mcparallel(fn())
shared[1] #Execute immediately after last command
# [1] 23
aaa[1,1] #Execute after one second
# [1] 11
mccollect() #To destroy all forked processes (and possibly collect their output)
To control for concurency and avoid race conditions use locks:
library(synchronicity) #for locks
m<-boost.mutex() #Lets create a mutex "m"
bad.incr<-function() #This function doesn't protect the shared resource with locks:
{
a<-shared[1]
Sys.sleep(1)
shared[1]<-a+1
}
good.incr<-function()
{
lock(m)
a<-shared[1]
Sys.sleep(1)
shared[1]<-a+1
unlock(m)
}
shared[1]<-1
for (i in 1:5) job<-mcparallel(bad.incr())
shared[1] #You can verify, that the value didn't get increased 5 times due to race conditions
mccollect() #To clear all threads, not to get the values
shared[1]<-1
for (i in 1:5) job<-mcparallel(good.incr())
shared[1] #As expected, eventualy after 5 seconds of waiting you get the 6
#[1] 6
mccollect()
Edit:
I simplified dependencies a bit by exchanging Rdsm::mgrmakevar
into bigmemory::big.matrix
. mgrmakevar
internally calls big.matrix
anyway, and we don't need anything more.