加速插值练习

4

我正在对大约120万个观测值进行约45,000次本地线性回归(基本上是这样),因此我希望能得到一些帮助,以加快速度,因为我很不耐烦。

我基本上正在尝试构建按年份和职位划分的工资合同--函数工资(给定公司、年份、职位经验)--适用于一堆公司。

以下是我正在处理的数据(基本结构)集:

> wages
         firm year position exp salary
      1: 0007 1996        4   1  20029
      2: 0007 1996        4   1  23502
      3: 0007 1996        4   1  22105
      4: 0007 1996        4   2  23124
      5: 0007 1996        4   2  22700
     ---                              
1175141:  994 2012        5   2  47098
1175142:  994 2012        5   2  45488
1175143:  994 2012        5   2  47098
1175144:  994 2012        5   3  45488
1175145:  994 2012        5   3  47098

我希望构建所有公司的经验水平从0到40的工资函数,例如:

> salary_scales
        firm year position exp   salary
     1: 0007 1996        4   0       NA
     2: 0007 1996        4   1 21878.67
     3: 0007 1996        4   2 23401.33
     4: 0007 1996        4   3 23705.00
     5: 0007 1996        4   4 24260.00
    ---                                
611019: 9911 2015        4  36       NA
611020: 9911 2015        4  37       NA
611021: 9911 2015        4  38       NA
611022: 9911 2015        4  39       NA
611023: 9911 2015        4  40       NA

为此,我按照@BondedDust的建议(在这里)使用了COBS(受限B样条)软件包,使我能够将工资合同的单调性融入到模型中。
但仍存在一些问题; 特别是在我需要外推时(每个公司没有年轻员工或老龄员工),拟合曲线往往会失去单调性或低于0。
为了解决这个问题,我使用简单的线性外推来扩大数据边界 - 在min_expmax_exp之外延伸拟合曲线,使其通过最低(或最高)的两个拟合点——虽然不完美,但似乎效果不错。
考虑到这一点,这是我的工作方式(请记住我是data.table的狂热者):
#get the range of experience for each firm
wages[,min_exp:=min(exp),by=.(year,firm,position)]
wages[,max_exp:=max(exp),by=.(year,firm,position)]
#Can't interpolate if there are only 2 or 3 unique experience cells represented
wages[,node_count:=length(unique(exp)),by=.(year,firm,position)]
#Nor if there are too few teachers
wages[,ind_count:=.N,by=.(year,firm,position)]
#Also troublesome when there is little variation in salaries like so:
wages[,sal_scale_flag:=mean(abs(salary-mean(salary)))<50,by=.(year,firm,position)]
wages[,sal_count_flag:=length(unique(salary))<5,by=.(year,firm,position)]

cobs_extrap<-function(exp,salary,min_exp,max_exp,
                      constraint="increase",print.mesg=F,nknots=8,
                      keep.data=F,maxiter=150){
  #these are passed as vectors
  min_exp<-min_exp[1]
  max_exp<-min(max_exp[1],40)
  #get in-sample fit
  in_sample<-predict(cobs(x=exp,y=salary,
                          constraint=constraint,
                          print.mesg=print.mesg,nknots=nknots,
                          keep.data=keep.data,maxiter=maxiter),
                     z=min_exp:max_exp)[,"fit"]

  #append by linear extension below min_exp
  c(if (min_exp==1) NULL else in_sample[1]-
      (min_exp:1)*(in_sample[2]-in_sample[1]),in_sample,
    #append by linear extension above max_exp
    if (max_exp==40) NULL else in_sample[length(in_sample)]+(1:(40-max_exp))*
      (in_sample[length(in_sample)]-in_sample[length(in_sample)-1]))
}

salary_scales<-
  wages[node_count>=7&ind_count>=10
               &sal_scale_flag==0&sal_count_flag==0,
               .(exp=0:40,
                 salary=cobs_extrap(exp,salary,min_exp,max_exp)),
               by=.(year,firm,position)]

有没有什么特别的事情可能会拖慢我的代码?还是我必须耐心等待?

以下是一些较小的公司-职位组合,可以进行试玩:

    firm year position exp salary count
 1: 0063 2010        5   2  37433    10
 2: 0063 2010        5   2  38749    10
 3: 0063 2010        5   4  38749    10
 4: 0063 2010        5   8  42700    10
 5: 0063 2010        5  11  47967    10
 6: 0063 2010        5  15  50637    10
 7: 0063 2010        5  19  51529    10
 8: 0063 2010        5  23  50637    10
 9: 0063 2010        5  33  52426    10
10: 0063 2010        5  37  52426    10
11: 9908 2006        4   1  26750    10
12: 9908 2006        4   6  36043    10
13: 9908 2006        4   7  20513    10
14: 9908 2006        4   8  45023    10
15: 9908 2006        4  13  33588    10
16: 9908 2006        4  15  46011    10
17: 9908 2006        4  15  37179    10
18: 9908 2006        4  22  43704    10
19: 9908 2006        4  28  56078    10
20: 9908 2006        4  29  44866    10

你似乎打错了字。exp是你的函数的参数,但它没有被使用;而total_exp_floor却被使用了(代替了exp参数)。 - Frank
照顾好了,对此很抱歉。 - MichaelChirico
1
您IP地址为143.198.54.68,由于运营成本限制,当前对于免费用户的使用频率限制为每个IP每72小时10次对话,如需解除限制,请点击左下角设置图标按钮(手机用户先点击左上角菜单按钮)。 - Roland
1
似乎所有的瓶颈都在cobs中,所以我认为我不会有任何运气尝试加速...除非我们能想出一种并行调用cobs的方法。 - MichaelChirico
1
似乎cobs是以纯R代码实现的。如果您需要足够的CPU时间,可以证明您可以用一两周的编程时间将其转换为C/C++代码(例如使用Rcpp)。或者您确实可以尝试并行化,但是您必须放弃纯数据表解决方案。 - Roland
哦,我以为在他们的GitHub页面上看到了一些C代码,我还以为他们最终会把它转换成C语言。 - MichaelChirico
1个回答

5
你的代码中有很多可以改进的地方,但我们先关注主要瓶颈。手头的问题可以被视为一个"尴尬并行"的问题。这意味着你的数据可以分成多个小块,每个小块都可以在单独的线程上进行计算,而不需要额外的开销。
为了看到当前问题的并行化可能性,你应该首先注意到,你正在分别为每个公司和/或年份执行完全相同的计算。例如,你可以将计算分成较小的子任务,针对每个单独的年份,然后将这些子任务分配给不同的CPU/GPU核心。通过这种方式可以获得显著的性能提升。最后,在子任务的处理完成后,你唯一需要做的就是合并结果。
然而,R及其所有内部库都是单线程运行的。你必须明确地拆分数据,然后将子任务分配给不同的核心。为了实现这一点,存在许多支持多线程的R包。在这里,我们将使用"doparallel"包作为示例。
由于你没有提供足够大的显式数据集来有效测试性能,因此我们首先创建一些随机数据:
set.seed(42)
wages<-data.table(firm=substr(10001:10010,2,5)[sample(10,size=1e6,replace=T)],
                  year=round(unif(1e6,1996,2015)),
                  position=round(runif(1e6,4,5)),
                  exp=round(runif(1e6,1,40)),
                  salary=round(exp(rnorm(1e6,mean=10.682,sd=.286))))
> wages
         firm year position exp salary
      1: 0001 1996        4  14  66136
      2: 0001 1996        4   3  42123
      3: 0001 1996        4   9  46528
      4: 0001 1996        4  11  35195
      5: 0001 1996        4   2  43926
     ---                              
 999996: 0010 2015        5  11  43140
 999997: 0010 2015        5  23  64025
 999998: 0010 2015        5  31  35266
 999999: 0010 2015        5  11  36267
1000000: 0010 2015        5   7  44315

现在,让我们运行你代码的第一部分:
#get the range of experience for each firm
wages[,min_exp:=min(exp),by=.(year,firm,position)]
wages[,max_exp:=max(exp),by=.(year,firm,position)]
#Can't interpolate if there are only 2 or 3 unique experience cells represented
wages[,node_count:=length(unique(exp)),by=.(year,firm,position)]
#Nor if there are too few teachers
wages[,ind_count:=.N,by=.(year,firm,position)]
#Also troublesome when there is little variation in salaries like so:
wages[,sal_scale_flag:=mean(abs(salary-mean(salary)))<50,by=.(year,firm,position)]
wages[,sal_count_flag:=length(unique(salary))<5,by=.(year,firm,position)]
> wages
         firm year position exp salary min_exp max_exp node_count ind_count sal_scale_flag sal_count_flag
      1: 0001 1996        4  14  66136       1      40         40      1373          FALSE          FALSE
      2: 0001 1996        4   3  42123       1      40         40      1373          FALSE          FALSE
      3: 0001 1996        4   9  46528       1      40         40      1373          FALSE          FALSE
      4: 0001 1996        4  11  35195       1      40         40      1373          FALSE          FALSE
      5: 0001 1996        4   2  43926       1      40         40      1373          FALSE          FALSE
     ---                                                                                                 
 999996: 0010 2015        5  11  43140       1      40         40      1326          FALSE          FALSE
 999997: 0010 2015        5  23  64025       1      40         40      1326          FALSE          FALSE
 999998: 0010 2015        5  31  35266       1      40         40      1326          FALSE          FALSE
 999999: 0010 2015        5  11  36267       1      40         40      1326          FALSE          FALSE
1000000: 0010 2015        5   7  44315       1      40         40      1326          FALSE          FALSE

现在我们将像以前一样以单线程的方式处理wages。请注意,我们首先保存原始数据,以便稍后可以对其执行多线程操作并比较结果:

start <- Sys.time()
salary_scales_1 <-
  wages[node_count>=7&ind_count>=10
        &sal_scale_flag==0&sal_count_flag==0,
        .(exp=0:40,salary=cobs_extrap(exp,salary,min_exp,max_exp)),
        by=.(firm,year,position)]
print(paste("No Parallelisation time: ",Sys.time()-start))
> print(paste("No Parallelisation time: ",Sys.time()-start))
[1] "No Parallelisation time:  1.13971961339315"
> salary_scales_1
       firm year position exp   salary
    1: 0001 1996        4   0 43670.14
    2: 0001 1996        4   1 43674.00
    3: 0001 1996        4   2 43677.76
    4: 0001 1996        4   3 43681.43
    5: 0001 1996        4   4 43684.99
   ---                                
16396: 0010 2015        5  36 44464.02
16397: 0010 2015        5  37 44468.60
16398: 0010 2015        5  38 44471.35
16399: 0010 2015        5  39 44472.27
16400: 0010 2015        5  40 43077.70

处理所有内容大约需要1分钟8秒钟。请注意,我们的实例中仅有10个不同的公司,这就是为什么处理时间与您本地的结果相比并不显著的原因。
现在,让我们尝试以并行方式执行此任务。 如前所述,对于我们的示例,我们希望按年份拆分数据并将较小的子部分分配到单独的核心。 我们将使用 doParallel 软件包来实现此目的:
我们需要做的第一件事是创建一个具有特定核心数的集群。 在我们的示例中,我们将尝试使用所有可用的核心。 接下来,我们将必须注册集群并将一些变量导出到子节点的全局环境中。 在这种情况下,子节点只需要访问 wages 。 此外,某些依赖库还需要在节点上进行评估才能使其正常工作。 在这种情况下,节点需要访问 data.frame cobs 库。 代码如下:
library(doParallel)
start <- Sys.time()
cl <- makeCluster(detectCores()); 
registerDoParallel(cl); 
clusterExport(cl,c("wages"),envir=environment());
clusterEvalQ(cl,library("data.table"));
clusterEvalQ(cl,library("cobs"));
salary_scales_2 <- foreach(i = 1996:2015) %dopar%
  {
    subSet <- wages[.(i)] # binary subsetting
    subSet[node_count>=7&ind_count>=10
           &sal_scale_flag==0&sal_count_flag==0,
           .(exp=0:40,
             salary=cobs_extrap(exp,salary,min_exp,max_exp)),
           by=.(firm,year,position)]
  }
stopCluster(cl)
print(paste("With parallelisation time: ",Sys.time()-start))
> print(paste("With parallelisation time: ",Sys.time()-start))
[1] "With parallelisation time:  23.4177722930908"

我们现在有一个数据表列表salary_scales_2,其中包含每个年份的子结果。请注意处理时间的加速:这次只花费了23秒,而不是原来的1.1分钟(65%的提升)。我们现在唯一需要做的就是合并结果。我们可以使用do.call("rbind", salary_scales_2)来合并表格的行(这几乎不需要时间--在一个运行中只需0.002秒)。最后,我们还进行了一个小检查,以验证多线程的结果确实与单线程运行的结果相同:
salary_scales_2<-do.call("rbind",salary_scales_2)
identical(salary_scales_1,salary_scales_2)
> identical(salary_scales_1,salary_scales_2)
[1] TRUE

回复评论

这是一个非常有趣的例子,但我认为您可能忽略了更重要的问题。事实上,data.table 确实执行了与内存和结构相关的优化,以使您能够以更高效的方式查询和访问数据。然而,在本例中,没有重大的内存或搜索相关瓶颈,特别是当您将其与 cobs 函数中实际的总数据处理时间进行比较时。例如,您更改的一行代码 subSet <- wages[year==uniqueYears[i],] 每次调用仅需约 0.04 秒。
如果你在运行中使用分析器,你会发现需要并行化的不是 data.table 或任何它的操作或分组,而是占用几乎所有处理时间的 cobs 函数(而该函数甚至不使用 data.table 作为输入)。我们在这个例子中试图将总的工作负载重新分配到不同的核心上以实现加速。我们的意图并不是拆分 data.table 操作,因为它们根本不费时间。但是,我们确实需要拆分我们的 data.table,因为我们需要为单独的 cobs 函数运行分割数据。实际上,在分割和合并表格时,我们甚至利用了 data.table 在所有方面的高效性。这根本没有额外的时间开销。

我想强调的是,这似乎是一个典型的例子,data.table确实需要并行化的地方 - 在data.table内置加速和并行化复制之间的权衡明显倾向于后者(例如,请参见此处)。基本上,我的组内计算需要足够长的时间,以至于复制所花费的时间很容易被超过。 - MichaelChirico

网页内容由stack overflow 提供, 点击上面的
可以查看英文原文,
原文链接