R parallel - parRapply not working properly

2019-08-13 09:56发布

问题:

I'm doing some unit testing on a package in development. One of the tests is failing. Specifically, I have parallel versions of the code and non-parallel versions. The non-parallel version works perfectly. The parallel version fails a unit test, with a seemingly nonsensical error.

## load my development package.
library(devtools) # for install_github
install_github("alexwhitworth/imputation")

## do some setup:
library(imputation)
library(kernlab)
library(parallel)


x1 <- matrix(rnorm(200), 20, 10)
x1[x1 > 1.25] <- NA
x3 <- create_canopies(x1, n_canopies= 5, q= 2)
prelim <- imputation:::impute_prelim(x3[[1]], parallel= TRUE, leave_cores= 1)

opt_h <- (4 * sd(x3[[1]][, -ncol(x3[[1]])], na.rm=T)^5 / (3 * nrow(x3[[1]])))^(1/5)
kern <- rbfdot(opt_h)


## write 2 identical functions:
## one in parallel
## one not in parallel

foo_parallel <- function(x_missing, x_complete, k, q, leave_cores) {
  cl <- makeCluster(detectCores() - leave_cores)
  x_missing_imputed <- parRapply(cl= cl, x_missing, function(i, x_complete) {
    rowID = as.numeric(i[1])
    i_original = unlist(i[-1])
    x_comp_rowID <- which(as.integer(rownames(x_complete)) == rowID)
    missing_cols <- which(is.na(x_complete[x_comp_rowID,]))

    # calculate distances
    distances <- imputation:::dist_q.matrix(x=rbind(x_complete[x_comp_rowID, ], 
                                                    x_complete[-x_comp_rowID,]), ref= 1L,  q= q)
    return(distances)
  }, x_complete= x_complete)
  stopCluster(cl)
  return(x_missing_imputed)
}

foo_nonparallel <- function(x_missing, x_complete, k, q) {
  x_missing_imputed <- t(apply(x_missing, 1, function(i, x_complete) {
    rowID = as.numeric(i[1])
    i_original = unlist(i[-1])
    x_comp_rowID <- which(as.integer(rownames(x_complete)) == rowID)
    missing_cols <- which(is.na(x_complete[x_comp_rowID,]))

    # calculate distances
    distances <- imputation:::dist_q.matrix(x=rbind(x_complete[x_comp_rowID, ], 
                                                    x_complete[-x_comp_rowID,]), ref= 1L,  q= q)
    return(distances)
  }, x_complete= x_complete))
  return(x_missing_imputed)
}

## test them
foo_parallel(prelim$x_missing, x3[[1]],k=3,q=2, leave_cores= 1) # fails
foo_nonparallel(prelim$x_missing, x3[[1]],k=3,q=2) # works

Error in checkForRemoteErrors(val) : 2 nodes produced errors; first error: ref must be an integer in {1, nrow(x)}.

As you can see, ref is clearly defined as ref= 1L which is in 1, nrow(x).

What is going on with the interaction with library(parallel) that is causing this error?

Edit - I'm on a windows machine:

R> sessionInfo()
R version 3.2.2 (2015-08-14)
Platform: x86_64-w64-mingw32/x64 (64-bit)
Running under: Windows 7 x64 (build 7601) Service Pack 1

回答1:

I have figured out what is causing the problem. This seems to me to be a library(parallel) bug / edge-case, specific to the parallelized versions of the apply functions (in this case parRapply). Perhaps someone older and wiser can explain why there isn't a catch in library(parallel) for this edge case.

The issue seems to be related to the number of tasks vs the number of workers available. On my machine, I have an 8-core processor. In this case, there are 5 tasks (one for each row of prelim$x_missing).

Granted, in typical use, I wouldn't be parallelizing work for 5 rows. This is just a unit test.

R> prelim$x_missing
              X1         X2         X3         X4         X5          X6         X7         X8          X9        X10 d_factor
6   6  0.2604170 -0.5966874         NA         NA -0.3013053  0.24313272  0.2836760  0.3977164 -0.60711109 -0.2929253        1
7   7 -0.8540576  0.1409047         NA  0.4801685 -0.9324517 -0.06487733 -0.2220201         NA  1.19077335 -0.3702607        2
8   8  0.5118453 -0.8750674         NA  0.1787238  0.6897163  0.20695122         NA -0.3488021  0.84200408 -0.4791230        1
12 12  0.3695746 -0.4919277 -1.2509180  1.1642152         NA  0.04018417         NA         NA -0.53436589 -1.5400345        2
15 15         NA -0.3608242 -0.6761515 -0.5366562  0.1763501          NA         NA  0.4967595  0.02635203 -0.6015536        1

Note that I am making the cluster via cl <- parallel::makeCluster(detectCores() - leave_cores) where detectCores() will return 8 for my current machine. The function call accepts a parameter for the number of cores to leave open leave_cores. When I make a cluster with more cores/nodes than rows in the use-case, the function fails. When I make a cluster with <= number of rows, the function works:

 # works : detectCores() == 8, 8 - 3 == 5 (number of rows / processes)
R> foo_parallel(prelim$x_missing, x3[[1]],k=3,q=2, leave_cores= 3)
 [1] 1.0216313 0.7355635 0.9201501 0.6906554 0.6613939 0.3628872 0.9995641 0.8571252 0.9271800 0.9201501 0.9238215 0.9798824 0.9059506
[14] 0.6891484 1.0158223 0.5442953 0.6906554 0.9238215 0.8607280 0.5897955 1.1084943 0.8518322 0.9227102 0.6613939 0.9798824 0.8607280
[27] 0.9518105 0.9792209 1.1968528 0.4447104 0.3628872 0.9059506 0.5897955 0.9518105 1.1249624

# fails : 8-2 = 6; 6 > nrow(prelim$x_missing)
R> foo_parallel(prelim$x_missing, x3[[1]],k=3,q=2, leave_cores= 2) 
Error in checkForRemoteErrors(val) : 
  one node produced an error: ref must be an integer in {1, nrow(x)}. 

tl,dr

As described in the rparallel vignette, detectCores is used to simply detect the cores, it very reasonably does not attempt to do any intelligent assignment of tasks to workers.

function detectCores() tries to determine the number of CPU cores in the machine on which R is running: it has ways to do so on all known current R platforms. What exactly it measures is OS-specific: we try where possible to report the number of physical cores available. On Windows the default is to report the number of logical CPUs. On modern hardware (e.g. Intel Core i7 ) the latter may not be unreasonable as hyper-threading does give a significant extra throughput.

I am calling the function parallel::parRapply to do the computation. parRapply dispatches the work to the workers via the splitRows function. But there doesn't seem to be any intelligence or error-catching to the splitRows function.

R> parRapply
function (cl = NULL, x, FUN, ...) 
{
    cl <- defaultCluster(cl)
    do.call(c, clusterApply(cl = cl, x = splitRows(x, length(cl)), 
        fun = apply, MARGIN = 1L, FUN = FUN, ...), quote = TRUE)
}
<bytecode: 0x00000000380ca530>
<environment: namespace:parallel>

I can't find the source code for splitRows but parallel::splitIndices seems similar:

R> parallel:::splitIndices
function (nx, ncl) 
{
    i <- seq_len(nx)
    if (ncl == 0L) 
        list()
    else if (ncl == 1L || nx == 1L) 
        list(i)
    else {
        fuzz <- min((nx - 1L)/1000, 0.4 * nx/ncl)
        breaks <- seq(1 - fuzz, nx + fuzz, length = ncl + 1L)
        structure(split(i, cut(i, breaks)), names = NULL)
    }
}
<bytecode: 0x00000000380a7828>
<environment: namespace:parallel>

In my unit test, this would execute as the following:

# all 8 cores:
nx <- 5; ncl <- 8
i <- seq_len(nx)
fuzz <- min((nx - 1L)/1000, 0.4 * nx / ncl)
breaks <- seq(1 - fuzz, nx + fuzz, length= ncl + 1L)
structure(split(i, cut(i, breaks)), names = NULL)
[[1]]
[1] 1

[[2]]
integer(0)

[[3]]
[1] 2

[[4]]
integer(0)

[[5]]
[1] 3

[[6]]
[1] 4

[[7]]
integer(0)

[[8]]
[1] 5

Where there are 3 integer(0)s, which cause failure further down the call stack.

# 3 cores (just showing the return):
structure(split(i, cut(i, breaks)), names = NULL)
[[1]]
[1] 1 2

[[2]]
[1] 3

[[3]]
[1] 4 5

If anyone can provide a link in the comments below to the source code for splitRows, I'll happily update this answer. Code for parallel::clusterApply and parallel:::staticClusterApply are easily found