What is the easiest way to parallelize a vectorize

2019-03-11 03:52发布

问题:

I have a very large list X and a vectorized function f. I want to calculate f(X), but this will take a long time if I do it with a single core. I have (access to) a 48-core server. What is the easiest way to parallelize the calculation of f(X)? The following is not the right answer:

library(foreach)
library(doMC)
registerDoMC()

foreach(x=X, .combine=c) %dopar% f(x)

The above code will indeed parallelize the calculation of f(X), but it will do so by applying f separately to every element of X. This ignores the vectorized nature of f and will probably make things slower as a result, not faster. Rather than applying f elementwise to X, I want to split X into reasonably-sized chunks and apply f to those.

So, should I just manually split X into 48 equal-sized sublists and then apply f to each in parallel, then manually put together the result? Or is there a package designed for this?

In case anyone is wondering, my specific use case is here.

回答1:

The itertools package was designed to address this kind of problem. In this case, I would use isplitVector:

n <- getDoParWorkers()
foreach(x=isplitVector(X, chunks=n), .combine='c') %dopar% f(x)

For this example, pvec is undoubtably faster and simpler, but this can be used on Windows with the doParallel package, for example.



回答2:

Although this is an older question this might be interesting for everyone who stumbled upon this via google (like me): Have a look at the pvec function in the multicore package. I think it does exactly what you want.



回答3:

Here's my implementation. It's a function chunkmap that takes a vectorized function, a list of arguments that should be vectorized, and a list of arguments that should not be vectorized (i.e. constants), and returns the same result as calling the function on the arguments directly, except that the result is calculated in parallel. For a function f, vector arguments v1, v2, v3, and scalar arguments s1, s2, the following should return identical results:

f(a=v1, b=v2, c=v3, d=s1, e=s2)
f(c=v3, b=v2, e=s2, a=v1, d=s1)
chunkapply(FUN=f, VECTOR.ARGS=list(a=v1, b=v2, c=v3), SCALAR.ARGS=list(d=s1, e=s2))
chunkapply(FUN=f, SCALAR.ARGS=list(e=s2, d=s1), VECTOR.ARGS=list(a=v1, c=v3, b=v2))

Since it is impossible for the chunkapply function to know which arguments of f are vectorized and which are not, it is up to you to specify when you call it, or else you will get the wrong results. You should generally name your arguments to ensure that they get bound correctly.

library(foreach)
library(iterators)
# Use your favorite doPar backend here
library(doMC)
registerDoMC()

get.chunk.size <- function(vec.length,
                           min.chunk.size=NULL, max.chunk.size=NULL,
                           max.chunks=NULL) {
  if (is.null(max.chunks)) {
    max.chunks <- getDoParWorkers()
  }
  size <- vec.length / max.chunks
  if (!is.null(max.chunk.size)) {
    size <- min(size, max.chunk.size)
  }
  if (!is.null(min.chunk.size)) {
    size <- max(size, min.chunk.size)
  }
  num.chunks <- ceiling(vec.length / size)
  actual.size <- ceiling(vec.length / num.chunks)
  return(actual.size)
}

ichunk.vectors <- function(vectors=NULL,
                           min.chunk.size=NULL,
                           max.chunk.size=NULL,
                           max.chunks=NULL) {
  ## Calculate number of chunks
  recycle.length <- max(sapply(vectors, length))
  actual.chunk.size <- get.chunk.size(recycle.length, min.chunk.size, max.chunk.size, max.chunks)
  num.chunks <- ceiling(recycle.length / actual.chunk.size)

  ## Make the chunk iterator
  i <- 1
  it <- idiv(recycle.length, chunks=num.chunks)
  nextEl <- function() {
    n <- nextElem(it)
    ix <- seq(i, length = n)
    i <<- i + n
    vchunks <- foreach(v=vectors) %do% v[1+ (ix-1) %% length(v)]
    names(vchunks) <- names(vectors)
    vchunks
  }
  obj <- list(nextElem = nextEl)
  class(obj) <- c("ichunk", "abstractiter", "iter")
  obj
}

chunkapply <- function(FUN, VECTOR.ARGS, SCALAR.ARGS=list(), MERGE=TRUE, ...) {
  ## Check that the arguments make sense
  stopifnot(is.list(VECTOR.ARGS))
  stopifnot(length(VECTOR.ARGS) >= 1)
  stopifnot(is.list(SCALAR.ARGS))
  ## Choose appropriate combine function
  if (MERGE) {
    combine.fun <- append
  } else {
    combine.fun <- foreach:::defcombine
  }
  ## Chunk and apply, and maybe merge
  foreach(vchunk=ichunk.vectors(vectors=VECTOR.ARGS, ...),
          .combine=combine.fun,
          .options.multicore = mcoptions) %dopar%
  {
    do.call(FUN, args=append(vchunk, SCALAR.ARGS))
  }
}

## Only do chunkapply if it will run in parallel
maybe.chunkapply <- function(FUN, VECTOR.ARGS, SCALAR.ARGS=list(), ...) {
  if (getDoParWorkers() > 1) {
    chunkapply(FUN, VECTOR.ARGS, SCALAR.ARGS, ...)
  } else {
    do.call(FUN, append(VECTOR.ARGS, SCALAR.ARGS))
  }
}

Here are some examples showing that chunkapply(f,list(x)) produces identical results to f(x). I have set the max.chunk.size extremely small to ensure that the chunking algorithm is actually used.

> # Generate all even integers from 2 to 100 inclusive
> identical(chunkapply(function(x,y) x*y, list(1:50), list(2), max.chunk.size=10), 1:50 * 2)
[1] TRUE

> ## Sample from a standard normal distribution, then discard values greater than 1
> a <- rnorm(n=100)
> cutoff <- 1
> identical(chunkapply(function(x,limit) x[x<=limit], list(x=a), list(limit=cutoff), max.chunk.size=10), a[a<cutoff])
[1] TRUE

If anyone has a better name than "chunkapply", please suggest it.

Edit:

As another answer points out, there is a function called pvec in the multicore pacakge that has very similar functionality to what I have written. For simple cases, you should us that, and you should vote up Jonas Rauch's answer for it. However, my function is a bit more general, so if any of the following apply to you, you might want to consider using my function instead:

  • You need to use a parallel backend other than multicore (e.g. MPI). My function uses foreach, so you can use any parallelization framework that provides a backend for foreach.
  • You need to pass multiple vectorized arguments. pvec only vectorizes over a single argument, so you couldn't easily implement parallel vectorized addition with pvec, for example. My function allows you to specify arbitrary arguments.


回答4:

Map-Reduce might be what you're looking for; it's been ported to R



回答5:

How about something like this? R will take advantage of all the available memory and multicore will parallelize over all available cores.

library(multicore)
result = mclapply(X, function,mc.preschedule=FALSE, mc.set.seed=FALSE)