I am using the R package foreach()
with %dopar%
to do long (~days) calculations in parallel. I would like the ability to stop the entire set of calculations in the event that one of them produces an error. However, I have not found a way to achieve this, and from the documentation and various forums I have found no indication that this is possible. In particular, break()
does not work and stop()
only stops the current calculation, not the whole foreach
loop.
Note that I cannot use a simple for loop, because ultimately I want to parallelize this using the doRNG package.
Here is a simplified, reproducible version of what I am attempting (shown here in serial with %do%
, but I have the same problem when using doRNG
and %dopar%
). Note that in reality I want to run all of the elements of this loop (here 10) in parallel.
library(foreach)
myfunc <- function() {
x <- foreach(k = 1:10, .combine="cbind", .errorhandling="stop") %do% {
cat("Element ", k, "\n")
Sys.sleep(0.5) # just to show that stop does not cause exit from foreach
if(is.element(k, 2:6)) {
cat("Should stop\n")
stop("Has stopped")
}
k
}
return(x)
}
x <- myfunc()
# stop() halts the processing of k=2:6, but it does not stop the foreach loop itself.
# x is not returned. The execution produces the error message
# Error in { : task 2 failed - "Has stopped"
What I would like to achieve is that the entire foreach loop can be exited immediately upon some condition (here, when the stop()
is encountered).
I have found no way to achieve this with foreach
. It seems that I would need a way to send a message to all the other processes to make them stop too.
If not possible with foreach
, does anyone know of alternatives? I have also tried to achieve this with parallel::mclapply
, but that does not work either.
> sessionInfo()
R version 3.0.0 (2013-04-03)
Platform: x86_64-apple-darwin10.8.0 (64-bit)
locale:
[1] C/UTF-8/C/C/C/C
attached base packages:
[1] stats graphics grDevices utils datasets methods base
other attached packages:
[1] foreach_1.4.0
loaded via a namespace (and not attached):
[1] codetools_0.2-8 compiler_3.0.0 iterators_1.0.6
It sounds like you want an impatient version of the "stop" error handling. You could implement that by writing a custom combine function, and arranging for
foreach
to call it as soon as each result is returned. To do that you need to:combine
on-the-fly, likedoMPI
ordoRedis
.multicombine
.inorder
toFALSE
.init
to something (likeNULL
)Here's an example that does that:
Note that I also set the error handling to "pass" so
foreach
will call the combine function with an error object. ThecallCC
function is used to return from theforeach
loop regardless of the error handling used withinforeach
and the backend. In this case,callCC
will call theabortable
function, passing it a function object that is used forcecallCC
to immediately return. By calling that function from the combine function we can escape from theforeach
loop when we detect an error object, and havecallCC
return that object. See?callCC
for more information.You can actually use
parfun
without a parallel backend registered and verify that theforeach
loop "breaks" as soon as it executes a task that throws an error, but that could take awhile since the tasks are executed sequentially. For example, this takes 20 seconds to execute if no backend is registered:When executing
parfun
in parallel, we need to do more than simply break out of theforeach
loop: we also need to stop the workers, otherwise they will continue to compute their assigned tasks. WithdoMPI
, the workers can be stopped usingmpi.abort
:Note that the cluster object can't be used after the loop aborts, because things weren't properly cleaned up, which is why the normal "stop" error handling doesn't work this way.
The answer I got from REvolution Technical support: "no--foreach doesn't currently have a way to stop all parallel computations on an error to any one".
Steve Weston's original answer essentially answered this. But here is a slightly modified version of his answer which also preserves two additional features in the way I need them: (1) random number generation; (2) printing run-time diagnostics.
When this file is sourced, it exits as intended with an error message
The 'log.txt' files provides correct diagnostics up to the point of error, and then provides additional error information. Crucially, the execution of all tasks is halted as soon as the stop() in the foreach loop is encountered: it does not wait until the entire foreach loop has completed. Thus I only see the 'Completed task' message up to i=4. (Note that if Sys.sleep() is shorter, then later tasks may be started before the mpi.abort() is processed.)
If I change the stop condition to be "i==100", then the stop and hence the error is not triggered. The code successfully exists without an error message, and r is a 2D array with dimensions 12*5.
Incidentally, it seems that I don't actually need .inorder=FALSE (I think that just gives me a small speed increase in the event that an error is found).
It's not a direct answer to your question, but using
when()
you can avoid entering the loop if a condition is satisfied:EDIT:
I forgot something: I think it's by design, that you cannot just stop the foreach loop. If you run the loop in parallel, each turn is processed independently, which means when you stop the entire loop for
k=2
it is not predictable if the process fork=1
terminated already or is still running. Hence, using thewhen()
condition gives you a deterministic result.EDIT 2: Another solution considering your comment.
Using this solution, the processes which are running while the stop condition becomes true are still calculated to an end, but you avoid time consumption on all upcoming processes.
I am not having much luck getting
foreach
to do what I want, so here is a solution using theparallel
package which seems to do what I want. I use theintermediate
option inmcparallel()
to pass results from my function,do.task()
, immediately to the functioncheck.res()
. Ifdo.task()
throws an error, then this is used incheck.res()
to trigger callingtools::pskill
to explicitly kill all workers. This might not be very elegant, but it works in the sense that it causes an instant stop of all worked. Furthermore, I can simply inherit all the variables I need for the processing indo.task()
from the present environment. (In realitydo.task()
is a much more complex function requiring many variables to be passed in.)This gives the following screen dump and results for variable
res