How to set up doSNOW and SOCK cluster with Torque/

2019-02-10 23:52发布

In continuation of this question (https://stackoverflow.com/questions/17222942/allow-foreach-workers-to-register-and-distribute-sub-tasks-to-other-workers), what is a best practice to connect doSNOW and SOCK cluster to Torque/MOAB scheduler in order to avoid processor affinity in an inner parallel loop that handles some part of the code of an outer parallel loop?

From the Steve's answer to that question, the baseline code without intraction with the scheduler could be:

library(doSNOW)
hosts <- c('host-1', 'host-2')
cl <- makeSOCKcluster(hosts)
registerDoSNOW(cl)
r <- foreach(i=1:4, .packages='doMC') %dopar% {
  registerDoMC(2)
  foreach(j=1:8, .combine='c') %dopar% {
    i * j
  }
}
stopCluster(cl)  

1条回答
叛逆
2楼-- · 2019-02-11 00:46

Torque always creates a file containing the node names that have been allocated to your job by Moab, and it passes the path of that file to your job via the PBS_NODEFILE environment variable. Node names may be listed multiple times to indicate that it allocated multiple cores to your job on that node. In this case, we want to start a cluster worker for each unique node name in PBS_NODEFILE, but keep track of the number of allocated cores on each of those nodes so we can specify the correct number of cores when registering doMC.

Here is a function that reads PBS_NODEFILE and returns a data frame with the allocated node information:

getnodes <- function() {
  f <- Sys.getenv('PBS_NODEFILE')
  x <- if (nzchar(f)) readLines(f) else rep('localhost', 3)
  as.data.frame(table(x), stringsAsFactors=FALSE)
}

The returned data frame contains a column named "x" of node names and a column named "Freq" of corresponding core counts.

This makes it simple to create and register a SOCK cluster with one worker per unique node:

nodes <- getnodes()
cl <- makeSOCKcluster(nodes$x)
registerDoSNOW(cl)

We can now easily execute a foreach loop with one task per worker, but it's not so easy to pass the correct number of allocated cores to each of those workers without depending on some implementation details of both snow and doSNOW, specifically relating to the implementation of the clusterApplyLB function used by doSNOW. Of course, it's easy if you happen to know that the number of allocated cores is the same on each node, but it's harder if you want a general solution to the problem.

One (not so elegant) general solution is to assign the number of allocated cores to a global variable on each of the workers via the snow clusterApply function:

setcores <- function(cl, nodes) {
  f <- function(cores) assign('allocated.cores', cores, pos=.GlobalEnv)
  clusterApply(cl, nodes$Freq, f)
}
setcores(cl, nodes)

This guarantees that the value of the "allocated.cores" variable on each of the workers is equal to the number of times that that node appeared in PBS_NODEFILE.

Now we can use that global variable when registering doMC:

r <- foreach(i=seq_along(nodes$x), .packages='doMC') %dopar% {
  registerDoMC(allocated.cores)
  foreach(j=1:allocated.cores, .combine='c') %dopar% {
    i * j
  }
}

Here is an example job script that could be used to execute this R script:

#!/bin/sh
#PBS -l nodes=4:ppn=8
cd "$PBS_O_WORKDIR"
R --slave -f hybridSOCK.R

When this is submitted via the qsub command, the R script will create a SOCK cluster with four workers, and each of those workers will execute the inner foreach loop using 8 cores. But since the R code is general, it should do the right thing regardless of the resources requested via qsub.

查看更多
登录 后发表回答