Speeding up an interpolation exercise

2019-02-07 10:40发布

问题:

I'm running about 45,000 local linear regressions (essentially) on about 1.2 million observations, so I'd appreciate some help trying to speed things up because I'm impatient.

I'm basically trying to construct year-by-position wage contracts--the function wage(experience given firm,year,position)--for a bunch of firms.

Here's the data (basic structure of) set I'm working with:

> wages
         firm year position exp salary
      1: 0007 1996        4   1  20029
      2: 0007 1996        4   1  23502
      3: 0007 1996        4   1  22105
      4: 0007 1996        4   2  23124
      5: 0007 1996        4   2  22700
     ---                              
1175141:  994 2012        5   2  47098
1175142:  994 2012        5   2  45488
1175143:  994 2012        5   2  47098
1175144:  994 2012        5   3  45488
1175145:  994 2012        5   3  47098

I want to construct the wage function for experience levels 0 through 40 for all firms, a la:

> salary_scales
        firm year position exp   salary
     1: 0007 1996        4   0       NA
     2: 0007 1996        4   1 21878.67
     3: 0007 1996        4   2 23401.33
     4: 0007 1996        4   3 23705.00
     5: 0007 1996        4   4 24260.00
    ---                                
611019: 9911 2015        4  36       NA
611020: 9911 2015        4  37       NA
611021: 9911 2015        4  38       NA
611022: 9911 2015        4  39       NA
611023: 9911 2015        4  40       NA

To that end, I've been working (at the suggestion of @BondedDust here) with the COBS (COnstrained B-Spline) package, which allows me to build in the monotonicity of the wage contract.

Some problems remain; in particular, when I need to extrapolate (whenever a given firm doesn't have any very young or very old employees), there's a tendency for the fit to lose monotonicity or to drop below 0.

To get around this, I've been using simple linear extrapolation outside the data bounds--extend the fit curve outside min_exp and max_exp so that it passes through the two lowest (or highest) fit points--not perfect, but it seems to be doing pretty well.

With that in mind, here's how I'm doing this so far (keep in mind I'm a data.table fanatic):

#get the range of experience for each firm
wages[,min_exp:=min(exp),by=.(year,firm,position)]
wages[,max_exp:=max(exp),by=.(year,firm,position)]
#Can't interpolate if there are only 2 or 3 unique experience cells represented
wages[,node_count:=length(unique(exp)),by=.(year,firm,position)]
#Nor if there are too few teachers
wages[,ind_count:=.N,by=.(year,firm,position)]
#Also troublesome when there is little variation in salaries like so:
wages[,sal_scale_flag:=mean(abs(salary-mean(salary)))<50,by=.(year,firm,position)]
wages[,sal_count_flag:=length(unique(salary))<5,by=.(year,firm,position)]

cobs_extrap<-function(exp,salary,min_exp,max_exp,
                      constraint="increase",print.mesg=F,nknots=8,
                      keep.data=F,maxiter=150){
  #these are passed as vectors
  min_exp<-min_exp[1]
  max_exp<-min(max_exp[1],40)
  #get in-sample fit
  in_sample<-predict(cobs(x=exp,y=salary,
                          constraint=constraint,
                          print.mesg=print.mesg,nknots=nknots,
                          keep.data=keep.data,maxiter=maxiter),
                     z=min_exp:max_exp)[,"fit"]

  #append by linear extension below min_exp
  c(if (min_exp==1) NULL else in_sample[1]-
      (min_exp:1)*(in_sample[2]-in_sample[1]),in_sample,
    #append by linear extension above max_exp
    if (max_exp==40) NULL else in_sample[length(in_sample)]+(1:(40-max_exp))*
      (in_sample[length(in_sample)]-in_sample[length(in_sample)-1]))
}

salary_scales<-
  wages[node_count>=7&ind_count>=10
               &sal_scale_flag==0&sal_count_flag==0,
               .(exp=0:40,
                 salary=cobs_extrap(exp,salary,min_exp,max_exp)),
               by=.(year,firm,position)]

Notice anything in particular that could be slowing down my code? Or am I forced to be patient?

To play around with here are some of the smaller firm-position combos:

    firm year position exp salary count
 1: 0063 2010        5   2  37433    10
 2: 0063 2010        5   2  38749    10
 3: 0063 2010        5   4  38749    10
 4: 0063 2010        5   8  42700    10
 5: 0063 2010        5  11  47967    10
 6: 0063 2010        5  15  50637    10
 7: 0063 2010        5  19  51529    10
 8: 0063 2010        5  23  50637    10
 9: 0063 2010        5  33  52426    10
10: 0063 2010        5  37  52426    10
11: 9908 2006        4   1  26750    10
12: 9908 2006        4   6  36043    10
13: 9908 2006        4   7  20513    10
14: 9908 2006        4   8  45023    10
15: 9908 2006        4  13  33588    10
16: 9908 2006        4  15  46011    10
17: 9908 2006        4  15  37179    10
18: 9908 2006        4  22  43704    10
19: 9908 2006        4  28  56078    10
20: 9908 2006        4  29  44866    10

回答1:

There are a lot of things in your code that could be improved, but let's focus on the main bottleneck here. The problem at hand can be considered as an embarrassingly parallel problem. This means that your data can be split up into multiple smaller pieces that can each be computed on separate threads individually without any extra overhead.

To see the parallelisation possibilities for the current problem you should first note that you are performing the exact same calculations for each of the individual firms and/or years separately. You could for example split up the calculations in smaller subtasks for each individual year and then allocate these subtasks to different CPU/GPU cores. A significant performance gain can be obtained in this manner. Finally, when the processing of the subtasks is done, the only thing you still need to do is merge the results.

However, R and all its internal libraries run as a single thread. You will have to explicitly split up your data and then assign the subtasks to different cores. In order to achieve this, there exist a number of R packages that support multithreading. We will use the doparallel package in our example here.

You did not provide an explicit dataset that is big enough to effectively test the performance so we will first create some random data:

set.seed(42)
wages<-data.table(firm=substr(10001:10010,2,5)[sample(10,size=1e6,replace=T)],
                  year=round(unif(1e6,1996,2015)),
                  position=round(runif(1e6,4,5)),
                  exp=round(runif(1e6,1,40)),
                  salary=round(exp(rnorm(1e6,mean=10.682,sd=.286))))
> wages
         firm year position exp salary
      1: 0001 1996        4  14  66136
      2: 0001 1996        4   3  42123
      3: 0001 1996        4   9  46528
      4: 0001 1996        4  11  35195
      5: 0001 1996        4   2  43926
     ---                              
 999996: 0010 2015        5  11  43140
 999997: 0010 2015        5  23  64025
 999998: 0010 2015        5  31  35266
 999999: 0010 2015        5  11  36267
1000000: 0010 2015        5   7  44315

Now, lets run the first part of your code:

#get the range of experience for each firm
wages[,min_exp:=min(exp),by=.(year,firm,position)]
wages[,max_exp:=max(exp),by=.(year,firm,position)]
#Can't interpolate if there are only 2 or 3 unique experience cells represented
wages[,node_count:=length(unique(exp)),by=.(year,firm,position)]
#Nor if there are too few teachers
wages[,ind_count:=.N,by=.(year,firm,position)]
#Also troublesome when there is little variation in salaries like so:
wages[,sal_scale_flag:=mean(abs(salary-mean(salary)))<50,by=.(year,firm,position)]
wages[,sal_count_flag:=length(unique(salary))<5,by=.(year,firm,position)]
> wages
         firm year position exp salary min_exp max_exp node_count ind_count sal_scale_flag sal_count_flag
      1: 0001 1996        4  14  66136       1      40         40      1373          FALSE          FALSE
      2: 0001 1996        4   3  42123       1      40         40      1373          FALSE          FALSE
      3: 0001 1996        4   9  46528       1      40         40      1373          FALSE          FALSE
      4: 0001 1996        4  11  35195       1      40         40      1373          FALSE          FALSE
      5: 0001 1996        4   2  43926       1      40         40      1373          FALSE          FALSE
     ---                                                                                                 
 999996: 0010 2015        5  11  43140       1      40         40      1326          FALSE          FALSE
 999997: 0010 2015        5  23  64025       1      40         40      1326          FALSE          FALSE
 999998: 0010 2015        5  31  35266       1      40         40      1326          FALSE          FALSE
 999999: 0010 2015        5  11  36267       1      40         40      1326          FALSE          FALSE
1000000: 0010 2015        5   7  44315       1      40         40      1326          FALSE          FALSE

We will now process the wages in a single threaded manner as you have done before. Note that we first save the original data so that we can perform multithreaded operations on it later and compare the results:

start <- Sys.time()
salary_scales_1 <-
  wages[node_count>=7&ind_count>=10
        &sal_scale_flag==0&sal_count_flag==0,
        .(exp=0:40,salary=cobs_extrap(exp,salary,min_exp,max_exp)),
        by=.(firm,year,position)]
print(paste("No Parallelisation time: ",Sys.time()-start))
> print(paste("No Parallelisation time: ",Sys.time()-start))
[1] "No Parallelisation time:  1.13971961339315"
> salary_scales_1
       firm year position exp   salary
    1: 0001 1996        4   0 43670.14
    2: 0001 1996        4   1 43674.00
    3: 0001 1996        4   2 43677.76
    4: 0001 1996        4   3 43681.43
    5: 0001 1996        4   4 43684.99
   ---                                
16396: 0010 2015        5  36 44464.02
16397: 0010 2015        5  37 44468.60
16398: 0010 2015        5  38 44471.35
16399: 0010 2015        5  39 44472.27
16400: 0010 2015        5  40 43077.70

It took about 1 minute, 8 seconds to process everything. Note that we only have 10 different firms in our dummy example, this is why the processing time is not that significant in comparison to your local results.

Now, let's try to perform this task in a parallelised manner. As mentioned, for our example we would like to split up the data per year and assign the smaller subparts to separate cores. We will use the doParallel package for this purpose:

The first thing that we will need to do is create a cluster with a particular number of cores. In our example we will try to use all the available cores. Next, we will have to register the cluster and export some variables to the global environments of the subnodes. In this case the subnodes only need access to wages. Additionally, some of the dependent libraries will also need to be evaluated on the nodes in order to make it work. In this case the nodes need access to both the data.frame and cobs libraries. The code looks like this:

library(doParallel)
start <- Sys.time()
cl <- makeCluster(detectCores()); 
registerDoParallel(cl); 
clusterExport(cl,c("wages"),envir=environment());
clusterEvalQ(cl,library("data.table"));
clusterEvalQ(cl,library("cobs"));
salary_scales_2 <- foreach(i = 1996:2015) %dopar%
  {
    subSet <- wages[.(i)] # binary subsetting
    subSet[node_count>=7&ind_count>=10
           &sal_scale_flag==0&sal_count_flag==0,
           .(exp=0:40,
             salary=cobs_extrap(exp,salary,min_exp,max_exp)),
           by=.(firm,year,position)]
  }
stopCluster(cl)
print(paste("With parallelisation time: ",Sys.time()-start))
> print(paste("With parallelisation time: ",Sys.time()-start))
[1] "With parallelisation time:  23.4177722930908"

We now have a list of data tables salary_scales_2 that contains the subresults for each invididual year. Notice the speedup in processing time: This time it took only 23 seconds instead of the original 1.1 minutes (65% improvement). The only thing that we still need to do now is merge the results. We can use do.call("rbind", salary_scales_2) in order to merge the rows of the tables together (this takes almost no time--.002 seconds on one run). Finally, we also perform a small check to verify that the multithreaded results are indeed identical to the results of the single threaded run:

salary_scales_2<-do.call("rbind",salary_scales_2)
identical(salary_scales_1,salary_scales_2)
> identical(salary_scales_1,salary_scales_2)
[1] TRUE

REPLY TO COMMENT It's a very interesting example indeed but I think you might be missing the more important issue here. The data.table indeed performs memory and structure related optimizations in order for you to query and access your data in a more efficient way. However, in this example there is no major memory or search related bottleneck, especially not when you make comparisons with the actual total data crunching time in the cobs function. For example, the line that you changed subSet <- wages[year==uniqueYears[i],] takes only about 0.04 seconds per call when you time it.

If you use a profiler on your runs then you will notice that it is not the data.table or any of its operations or groupings that beg for parallelisation, it is the cobs function that takes up almost all of the processing time (and this function doesn't even use a data.table as input). What we are trying to do in the example is reassigning our total workload of the cobs function to different cores in order to achieve our speedup. Our intention is not to split up the data.table operations since they are not costly at all. However, we indeed have to split up our data.table as a result of the fact that we need to split up the data for the separate cobs function runs. In fact, we have even taken advantage of the fact that the data.table is efficient in all regards while splitting and merging the table(s). This took no additional time at all.