I'm doing some unit testing on a package in development. One of the tests is failing. Specifically, I have parallel versions of the code and non-parallel versions. The non-parallel version works perfectly. The parallel version fails a unit test, with a seemingly nonsensical error.
## load my development package.
library(devtools) # for install_github
install_github("alexwhitworth/imputation")
## do some setup:
library(imputation)
library(kernlab)
library(parallel)
x1 <- matrix(rnorm(200), 20, 10)
x1[x1 > 1.25] <- NA
x3 <- create_canopies(x1, n_canopies= 5, q= 2)
prelim <- imputation:::impute_prelim(x3[[1]], parallel= TRUE, leave_cores= 1)
opt_h <- (4 * sd(x3[[1]][, -ncol(x3[[1]])], na.rm=T)^5 / (3 * nrow(x3[[1]])))^(1/5)
kern <- rbfdot(opt_h)
## write 2 identical functions:
## one in parallel
## one not in parallel
foo_parallel <- function(x_missing, x_complete, k, q, leave_cores) {
cl <- makeCluster(detectCores() - leave_cores)
x_missing_imputed <- parRapply(cl= cl, x_missing, function(i, x_complete) {
rowID = as.numeric(i[1])
i_original = unlist(i[-1])
x_comp_rowID <- which(as.integer(rownames(x_complete)) == rowID)
missing_cols <- which(is.na(x_complete[x_comp_rowID,]))
# calculate distances
distances <- imputation:::dist_q.matrix(x=rbind(x_complete[x_comp_rowID, ],
x_complete[-x_comp_rowID,]), ref= 1L, q= q)
return(distances)
}, x_complete= x_complete)
stopCluster(cl)
return(x_missing_imputed)
}
foo_nonparallel <- function(x_missing, x_complete, k, q) {
x_missing_imputed <- t(apply(x_missing, 1, function(i, x_complete) {
rowID = as.numeric(i[1])
i_original = unlist(i[-1])
x_comp_rowID <- which(as.integer(rownames(x_complete)) == rowID)
missing_cols <- which(is.na(x_complete[x_comp_rowID,]))
# calculate distances
distances <- imputation:::dist_q.matrix(x=rbind(x_complete[x_comp_rowID, ],
x_complete[-x_comp_rowID,]), ref= 1L, q= q)
return(distances)
}, x_complete= x_complete))
return(x_missing_imputed)
}
## test them
foo_parallel(prelim$x_missing, x3[[1]],k=3,q=2, leave_cores= 1) # fails
foo_nonparallel(prelim$x_missing, x3[[1]],k=3,q=2) # works
Error in checkForRemoteErrors(val) :
2 nodes produced errors; first error: ref must be an integer in {1, nrow(x)}.
As you can see, ref
is clearly defined as ref= 1L
which is in 1, nrow(x).
What is going on with the interaction with library(parallel)
that is causing this error?
Edit - I'm on a windows machine:
R> sessionInfo()
R version 3.2.2 (2015-08-14)
Platform: x86_64-w64-mingw32/x64 (64-bit)
Running under: Windows 7 x64 (build 7601) Service Pack 1
I have figured out what is causing the problem. This seems to me to be a library(parallel)
bug / edge-case, specific to the parallelized versions of the apply functions (in this case parRapply
). Perhaps someone older and wiser can explain why there isn't a catch in library(parallel)
for this edge case.
The issue seems to be related to the number of tasks vs the number of workers available. On my machine, I have an 8-core processor. In this case, there are 5 tasks (one for each row of prelim$x_missing
).
Granted, in typical use, I wouldn't be parallelizing work for 5 rows. This is just a unit test.
R> prelim$x_missing
X1 X2 X3 X4 X5 X6 X7 X8 X9 X10 d_factor
6 6 0.2604170 -0.5966874 NA NA -0.3013053 0.24313272 0.2836760 0.3977164 -0.60711109 -0.2929253 1
7 7 -0.8540576 0.1409047 NA 0.4801685 -0.9324517 -0.06487733 -0.2220201 NA 1.19077335 -0.3702607 2
8 8 0.5118453 -0.8750674 NA 0.1787238 0.6897163 0.20695122 NA -0.3488021 0.84200408 -0.4791230 1
12 12 0.3695746 -0.4919277 -1.2509180 1.1642152 NA 0.04018417 NA NA -0.53436589 -1.5400345 2
15 15 NA -0.3608242 -0.6761515 -0.5366562 0.1763501 NA NA 0.4967595 0.02635203 -0.6015536 1
Note that I am making the cluster via cl <- parallel::makeCluster(detectCores() - leave_cores)
where detectCores() will return 8 for my current machine. The function call accepts a parameter for the number of cores to leave open leave_cores
. When I make a cluster with more cores/nodes than rows in the use-case, the function fails. When I make a cluster with <= number of rows, the function works:
# works : detectCores() == 8, 8 - 3 == 5 (number of rows / processes)
R> foo_parallel(prelim$x_missing, x3[[1]],k=3,q=2, leave_cores= 3)
[1] 1.0216313 0.7355635 0.9201501 0.6906554 0.6613939 0.3628872 0.9995641 0.8571252 0.9271800 0.9201501 0.9238215 0.9798824 0.9059506
[14] 0.6891484 1.0158223 0.5442953 0.6906554 0.9238215 0.8607280 0.5897955 1.1084943 0.8518322 0.9227102 0.6613939 0.9798824 0.8607280
[27] 0.9518105 0.9792209 1.1968528 0.4447104 0.3628872 0.9059506 0.5897955 0.9518105 1.1249624
# fails : 8-2 = 6; 6 > nrow(prelim$x_missing)
R> foo_parallel(prelim$x_missing, x3[[1]],k=3,q=2, leave_cores= 2)
Error in checkForRemoteErrors(val) :
one node produced an error: ref must be an integer in {1, nrow(x)}.
tl,dr
As described in the rparallel vignette, detectCores
is used to simply detect the cores, it very reasonably does not attempt to do any intelligent assignment of tasks to workers.
function detectCores() tries to determine the number of CPU cores in the machine on which R is running: it has ways to do so on all known current R
platforms. What exactly it measures is OS-specific: we try where possible to report the number of physical cores available. On Windows the default is to report the number of logical CPUs. On modern hardware (e.g. Intel Core i7 ) the latter may not be unreasonable as hyper-threading does give a significant
extra throughput.
I am calling the function parallel::parRapply
to do the computation. parRapply
dispatches the work to the workers via the splitRows
function. But there doesn't seem to be any intelligence or error-catching to the splitRows
function.
R> parRapply
function (cl = NULL, x, FUN, ...)
{
cl <- defaultCluster(cl)
do.call(c, clusterApply(cl = cl, x = splitRows(x, length(cl)),
fun = apply, MARGIN = 1L, FUN = FUN, ...), quote = TRUE)
}
<bytecode: 0x00000000380ca530>
<environment: namespace:parallel>
I can't find the source code for splitRows
but parallel::splitIndices
seems similar:
R> parallel:::splitIndices
function (nx, ncl)
{
i <- seq_len(nx)
if (ncl == 0L)
list()
else if (ncl == 1L || nx == 1L)
list(i)
else {
fuzz <- min((nx - 1L)/1000, 0.4 * nx/ncl)
breaks <- seq(1 - fuzz, nx + fuzz, length = ncl + 1L)
structure(split(i, cut(i, breaks)), names = NULL)
}
}
<bytecode: 0x00000000380a7828>
<environment: namespace:parallel>
In my unit test, this would execute as the following:
# all 8 cores:
nx <- 5; ncl <- 8
i <- seq_len(nx)
fuzz <- min((nx - 1L)/1000, 0.4 * nx / ncl)
breaks <- seq(1 - fuzz, nx + fuzz, length= ncl + 1L)
structure(split(i, cut(i, breaks)), names = NULL)
[[1]]
[1] 1
[[2]]
integer(0)
[[3]]
[1] 2
[[4]]
integer(0)
[[5]]
[1] 3
[[6]]
[1] 4
[[7]]
integer(0)
[[8]]
[1] 5
Where there are 3 integer(0)s, which cause failure further down the call stack.
# 3 cores (just showing the return):
structure(split(i, cut(i, breaks)), names = NULL)
[[1]]
[1] 1 2
[[2]]
[1] 3
[[3]]
[1] 4 5
If anyone can provide a link in the comments below to the source code for splitRows
, I'll happily update this answer. Code for parallel::clusterApply
and parallel:::staticClusterApply
are easily found