R: for loop within a foreach loop

2019-01-29 03:12发布

EDIT: Reduced the size of the dataset

A sample data:

df <- data.frame(loc.id = rep(1:10, each = 80*36), 
             year = rep(rep(1980:2015, each = 80), times = 10),
             day = rep(rep(1:80, times = 36),times = 10),
             rain = runif(10*36*80, min = 0 , max = 5),
             swc = runif(10*36*80,min = 0, max = 50),
             SW.max = rep(runif(10, min = 100, max = 200), each = 80*36),
             SW.ini = runif(10*36*80),
             PETc = runif(10*36*80, min = 0 , max = 1.3),
             SW = NA,
             PAW = NA, 
             aetc = NA)

df contains daily data (80 days) for 1980-2015 for 10 locations. For each location X year combination, I want to do following calculation

list.result <- list() # create a list to store all results
ptm <- proc.time()
n <- 0

for(i in seq_along(unique(df$loc.id))){

location <- unique(df$loc.id)[i]
print(location)

for(j in seq_along(unique(df$year))){

yr <- unique(df$year)[j]
print(yr)

df_year <- df[df$loc.id == location & df$year == yr,] # subset data for location i and year y

# for the first row of data frame, i need to calculate some values 
SW.ini <- df_year$SW.ini[1] 
SW.max <- df_year$SW.max[1]

df_year$PAW[1] <- SW.ini + df_year$rain[1]
df_year$aetc[1] <- ifelse(df_year$PAW[1] >= df_year$swc[1], 
df_year$PETc[1],(df_year$PAW[1]/df_year$swc[1])*df_year$PETc[1])
df_year$aetc[1] <- ifelse(df_year$aetc[1] > df_year$PAW[1], df_year$PAW[1], df_year$aetc[1])
df_year$SW[1] <- SW.ini + df_year$rain[1] -  df_year$aetc[1]
df_year$SW[1] <- ifelse(df_year$SW[1] > SW.max, SW.max, ifelse(df_year$SW[1] < 0, 0,df_year$SW[1]))

# for row 2 till row n of df_year, I need to do this:
for (day in 2:nrow(df_year)){
df_year$PAW[day] <- df_year$SW[day - 1] + df_year$rain[day]

df_year$aetc[day] <- ifelse(df_year$PAW[day] >= df_year$swc[day], df_year$PETc[day], (df_year$PAW[day]/df_year$swc[day]) * df_year$PETc[day])

df_year$aetc[day] <- ifelse(df_year$aetc[day] > df_year$PAW[day], df_year$PAW[day],df_year$aetc[day])

df_year$SW[day] <- df_year$SW[day - 1] + df_year$rain[day] -  df_year$aetc[day]

df_year$SW[day] <- ifelse(df_year$SW[day] > SW.max,SW.max, ifelse(df_year$SW[day] < 0, 0,df_year$SW[day]))

   }
n <- n + 1
list.result[[n]] <- df_year
}}
proc.time() - ptm
user  system elapsed 
8.64    0.00    8.75

final.dat <- rbindlist(list.result)

This loop is sequential and I thought it is a good candidate for foreach in R. I have not really worked with foreach so doing some online research brought me to this:

  library(doParallel)
  cl <- makeCluster(4) # if I understood this correctly, it assings number of cores to be used 
  registerDoParallel(cl)

  foreach(i = seq_along(unique(df$loc.id)) %dopar% {
    list.result <- list()
    for(j in seq_along(1980:2015)){

      df_year <- df[df$loc.id == unique(df$loc.id)[i] & df$year == unique(df$year)[j],] # subset data for location i and year y

      # for the first row of data frame, i need to calculate some values 
      SW.ini <- df_year$SW.ini[1] 
      SW.max <- df_year$SW.max[1]

      df_year$PAW[1] <- SW.ini + df_year$rain[1]
      df_year$aetc[1] <- ifelse(df_year$PAW[1] >= df_year$swc[1], df_year$PETc[1],(df_year$PAW[1]/df_year$swc[1])*df_year$PETc[1])
      df_year$aetc[1] <- ifelse(df_year$aetc[1] > df_year$PAW[1], df_year$PAW[1], df_year$aetc[1])
      df_year$SW[1] <- SW.ini + df_year$rain[1] -  df_year$aetc[1]
      df_year$SW[1] <- ifelse(df_year$SW[1] > SW.max, SW.max, ifelse(df_year$SW[1] < 0, 0,df_year$SW[1]))

      # for row 2 till row n of df_year, I need to do this:
      for (day in 2:nrow(df_year)){
        df_year$PAW[day] <- df_year$SW[day - 1] + df_year$rain[day]
        df_year$aetc[day] <- ifelse(df_year$PAW[day] >= df_year$swc[day], df_year$PETc[day], (df_year$PAW[day]/df_year$swc[day]) * df_year$PETc[day])
        df_year$aetc[day] <- ifelse(df_year$aetc[day] > df_year$PAW[day], df_year$PAW[day],df_year$aetc[day])
        df_year$SW[day] <- df_year$SW[day - 1] + df_year$rain[day] -  df_year$aetc[day]
        df_year$SW[day] <- ifelse(df_year$SW[day] > SW.max,SW.max, ifelse(df_year$SW[day] < 0, 0,df_year$SW[day]))

      }
      list.result[[j]] <- df_year
    }
    dat <- rbindlist(list.result)
    fwrite(dat,paste0(i,"dat.csv"))
 }

My questions are:

1) Is the above data a good candidate for foreach

2) There is a for-loop within the foreach. Does that make sense?

3) How do I make the above foreach run and return all the results

2条回答
倾城 Initia
2楼-- · 2019-01-29 03:47

This code replaces the inner loop

clamp <- function(x, low, high)
    min(high, max(low, x))

fill1 <- function(df) {
    rain <- df$rain
    swc <- df$swc
    PETc <- df$PETc

    SW0 <- df$SW.ini[1]
    SW.max <- df$SW.max[1]

    SW <- PAW <- aetc <- numeric(nrow(df))

    for (day in seq_along(rain)) {
        PAW[day] <- SW0 + rain[day]

        if (PAW[day] >= swc[day]) {
            aetc0 <- PETc[day]
        } else {
            aetc0 <- (PAW[day] / swc[day]) * PETc[day]
        }
        aetc[day] <- min(PAW[day], aetc0)

        SW0 <- SW[day] <- clamp(PAW[day] -  aetc[day], 0, SW.max)
    }

    list(SW = SW, PAW = PAW, aetc = aetc)
}

and is about 60x faster than the implementation in the original question. Note that this is the approach taken in C++, i.e., allocate and update new vectors, rather than existing parts of the data.frame; this is a big part of the performance difference, and the benefit can be obtained WITHOUT Rcpp.

This is a generalization (very light testing!) to iterate on a location.year x day matrix

pclamp <- function(x, low, high)
    pmin(high, pmax(low, x))

fill2 <- function(rain, swc, PETc, SW0, SW.max) {

    SW <- PAW <- aetc <- matrix(0, nrow = nrow(rain), ncol = ncol(rain))

    for (day in seq_len(ncol(rain))) {
        PAW[, day] <- SW0 + rain[, day]

        aetc0 <- PETc[, day]
        idx <- PAW[, day] < swc[, day]
        aetc0[idx] <- (PAW[idx, day] / swc[idx, day]) * PETc[idx, day]
        aetc[, day] <- pmin(PAW[, day], aetc0)

        SW0 <- SW[, day] <- pclamp(PAW[, day] -  aetc[, day], 0, SW.max)
    }

    list(SW = SW, PAW = PAW, aetc = aetc)
}

with inputs from the original, assuming the input is sorted by year, location, and day

days <- 80
rain <- matrix(df$rain, ncol=days, byrow=TRUE)
swc <- matrix(df$swc, ncol=days, byrow=TRUE)
PETc <- matrix(df$PETc, ncol=days, byrow=TRUE)
SW.ini <- df$SW.ini[df$day == 1]
SW.max <- df$SW.max[df$day == 1]

result <- fill2(rain, swc, PETc, SW.ini, SW.max)

It is about 15x faster than fill1() on a per-location.date basis, for the subset of data in the question. The operation on the sample data takes about 10 milliseconds, and about 10 seconds for the full data -- 5x slower than Matt's C++ solution but still a very substantial improvement over the original and employing basic R techniques that will improve code in many different areas.

查看更多
仙女界的扛把子
3楼-- · 2019-01-29 03:53

To address your three questions:

  1. I don't think so. (More computationally efficient methods can completely eliminate the need for adding more processing power.)
  2. Nothing inherently bad about for loops within parallel processing. (In fact, the more computation that needs to be done on each chunk, the more likely parallel methods could give a performance improvement.)
  3. (Not applicable if you use the methods below)

Using Rcpp and data.table instead

Compiling the logic with C++ and applying it by group using data.table grouping operations gives a ~2,000x speed-up from your baseline, far greater than you might hope to get by parallelizing.

On your original example, which had 39,420,000 rows, this executes on my machine in 1.883 seconds; and on the revised one with 28,800 rows, this executes in 0.004 seconds

library(data.table)
library(Rcpp)

Define and compile a C++ function, CalcSW() inline in the R script:

One note: counting in C/C++ starts at 0, unlike R, which starts at 1-- that's why the indices are different here

Rcpp::cppFunction('
List CalcSW(NumericVector SW_ini,
            NumericVector SW_max,
            NumericVector rain,
            NumericVector swc,
            NumericVector PETc) {

  int n = SW_ini.length();
  NumericVector SW(n);
  NumericVector PAW(n);
  NumericVector aetc(n);

  double SW_ini_glob = SW_ini[0];
  double SW_max_glob = SW_max[0];

  SW[0] = SW_ini_glob;
  PAW[0] = SW[0] + rain[0];

  if (PAW[0] > swc[0]){
    aetc[0] = PETc[0];
  } else {
    aetc[0] = PAW[0]/swc[0]*PETc[0];
  }

  if (aetc[0] > PAW[0]){
    aetc[0] = PAW[0];
  }

  SW[0] = SW[0] + rain[0] - aetc[0];

  if(SW[0] > SW_max_glob){
    SW[0] = SW_max_glob;
  }

  if(SW[0] < 0){
    SW[0] = 0;
  }

  for (int i = 1; i < n; i++) {

    PAW[i] = SW[i-1] + rain[i];

    if (PAW[i] > swc[i]){
      aetc[i] = PETc[i];
    } else {
      aetc[i] = PAW[i]/swc[i]*PETc[i];
    }

    if (aetc[i] > PAW[i]){
      aetc[i] = PAW[i];
    }

    SW[i] = SW[i-1] + rain[i] - aetc[i];

    if(SW[i] > SW_max_glob){
      SW[i] = SW_max_glob;
    }

    if(SW[i] < 0){
     SW[i] = 0;
    }
  }
  return Rcpp::List::create(Rcpp::Named("SW") = SW,
                            Rcpp::Named("PAW") = PAW,
                            Rcpp::Named("aetc") = aetc);
}')

Create data.table

df <- data.table(loc.id = rep(1:10, each = 80*36), 
                 year = rep(rep(1980:2015, each = 80), times = 10),
                 day = rep(rep(1:80, times = 36),times = 10),
                 rain = runif(10*36*80, min = 0 , max = 5),
                 swc = runif(10*36*80,min = 0, max = 50),
                 SW_max = rep(runif(10, min = 100, max = 200), each = 80*36),
                 SW_ini = runif(10*36*80),
                 PETc = runif(10*36*80, min = 0 , max = 1.3),
                 SW = as.numeric(NA),
                 PAW = as.numeric(NA), 
                 aetc = as.numeric(NA))

setkey(df, loc.id, year, day)

Execute the function CalcSW() on the df for each combination of loc.id and year, assign returned values to the three columns simultaneously:

system.time({
  df[,  c("SW","PAW","aetc") := CalcSW(SW_ini,
                                       SW_max,
                                       rain,
                                       swc,
                                       PETc), keyby = .(loc.id, year)]
})

...

   user  system elapsed 
  0.004   0.000   0.004 

Results:

head(df)

...

   loc.id year day       rain       swc   SW_max     SW_ini      PETc       SW      PAW       aetc
1:      1 1980   1 0.35813251 28.360715 177.3943 0.69116310 0.2870478 1.038675 1.049296 0.01062025
2:      1 1980   2 1.10331116 37.013022 177.3943 0.02742273 0.4412420 2.125335 1.396808 0.01665171
3:      1 1980   3 1.76680011 32.509970 177.3943 0.66273062 1.1071233 3.807561 2.483467 0.08457420
4:      1 1980   4 3.20966558  8.252797 177.3943 0.12220454 0.3496968 6.840713 4.165693 0.17651342
5:      1 1980   5 1.32498191 14.784203 177.3943 0.66381497 1.2168838 7.573160 7.198845 0.59253503
6:      1 1980   6 0.02547458 47.903637 177.3943 0.21871598 1.0864713 7.418750 7.931292 0.17988449

I'm not 100% positive I implemented your logic perfectly, but the logic should be pretty straightforward to tweak where I may have missed something, I implemented it in a very similar manner to how you laid it out.


One other note: It's way easier to write C++ with auto-indenting and code highlighting (whether you're using RStudio or Emacs) you get if you create a separate file, named something like TestCode.cppformatted like below.

Then, you can either use Rcpp::sourceCpp("TestCode.cpp") to compile your function in your R Script, or you can copy and paste everything except for the first three lines as a character string into as an argument of Rcpp::cppFunction() like I did above.

#include <Rcpp.h>
using namespace Rcpp;

// [[Rcpp::export]]
List CalcSW(NumericVector SW_ini,
                     NumericVector SW_max,
                     NumericVector rain,
                     NumericVector swc,
                     NumericVector PETc) {

  int n = SW_ini.length();
  NumericVector SW(n);
  NumericVector PAW(n);
  NumericVector aetc(n);

  double SW_ini_glob = SW_ini[0];
  double SW_max_glob = SW_max[0];

  SW[0] = SW_ini_glob;
  PAW[0] = SW[0] + rain[0];

  if (PAW[0] > swc[0]){
    aetc[0] = PETc[0];
  } else {
    aetc[0] = PAW[0]/swc[0]*PETc[0];
  }

  if (aetc[0] > PAW[0]){
    aetc[0] = PAW[0];
  }

  SW[0] = SW[0] + rain[0] - aetc[0];

  if(SW[0] > SW_max_glob){
    SW[0] = SW_max_glob;
  }

  if(SW[0] < 0){
    SW[0] = 0;
  }

  for (int i = 1; i < n; i++) {

    PAW[i] = SW[i-1] + rain[i];

    if (PAW[i] > swc[i]){
      aetc[i] = PETc[i];
    } else {
      aetc[i] = PAW[i]/swc[i]*PETc[i];
    }

    if (aetc[i] > PAW[i]){
      aetc[i] = PAW[i];
    }

    SW[i] = SW[i-1] + rain[i] - aetc[i];

    if(SW[i] > SW_max_glob){
      SW[i] = SW_max_glob;
    }

    if(SW[i] < 0){
      SW[i] = 0;
    }
  }
  return Rcpp::List::create(Rcpp::Named("SW") = SW,
                            Rcpp::Named("PAW") = PAW,
                            Rcpp::Named("aetc") = aetc);
}
查看更多
登录 后发表回答