运行if循环并行化

3

我有一个包含大约400万行数据的数据集需要循环遍历。数据结构是,有重复的ID,这些ID彼此依赖,但是ID之间的数据是独立的。对于每个ID,[i+1]行依赖于[i]行。以下是一个可重现的示例。我知道这个示例在内部函数方面并不实用,但它只是展示我所拥有的结构。

set.seed(123)

id1 = rep(1,5)
id2 = rep(2,5)
id3 = rep(3,5)
ids = c(id1,id2,id3)

month = rep(seq(1,5),3)

x = round(rnorm(15,2,5))
y = rep(0,15)

df = as.data.frame(cbind(ids,month,x,y))

for (i in 1:nrow(df)){
  if(i>1 && df[i,1]==df[i-1,1]){
    #Main functions go here
    df[i,4] = df[i-1,4]^2+df[i,3]
  }
  else {
    df[i,4] = 1
  }
}

实际上,真实函数的1000个循环需要大约90秒,因此处理400万行需要数天。这种方式对我来说是不可行的。然而,这些ID是独立的,不需要一起运行。我的问题是:是否有一种方法可以并行运行此类循环?一个非常不优雅的解决方案是将文件分成50个部分,而不会分割ID,并在50个子文件上运行相同的代码。我想应该有一种编码的方法来解决这个问题。
编辑:添加月份列以显示为什么行彼此相关。回答下面两个评论:
1)实际上有6-7行函数要运行。我能用ifelse()和多个函数吗? 2)期望的输出将是完整的数据框。实际上有更多的列,但我需要数据框中的每一行。
   ids month  x      y
1    1     1 -1      1
2    1     2  1      2
3    1     3 10     14
4    1     4  2    198
5    1     5  3  39207
6    2     1 11      1
7    2     2  4      5
8    2     3 -4     21
9    2     4 -1    440
10   2     5  0 193600
11   3     1  8      1
12   3     2  4      5
13   3     3  4     29
14   3     4  3    844
15   3     5 -1 712335

编辑2:我尝试使用另一篇帖子中的foreach()包,但似乎不起作用。这段代码能够运行,但我认为问题在于行是如何在核心之间分配的。如果每行都按顺序发送到不同的核心,则相同的ID永远不会在同一个核心中。

library(foreach)
library(doParallel)


set.seed(123)

id1 = rep(1,5)
id2 = rep(2,5)
id3 = rep(3,5)
ids = c(id1,id2,id3)

month = rep(seq(1,5),3)

x = round(rnorm(15,2,5))
y = rep(0,15)

df = as.data.frame(cbind(ids,month,x,y))

#setup parallel backend to use many processors
cores=detectCores()
cl <- makeCluster(cores[1]-1) #not to overload your computer
registerDoParallel(cl)

finalMatrix <- foreach(i=1:nrow(df), .combine=cbind) %dopar% {

  for (i in 1:nrow(df)){
    if(i>1 && df[i,1]==df[i-1,1]){
      #Main functions go here
      df[i,4] = df[i-1,4]^2+df[i,3]
    }
    else {
      df[i,4] = 1
    }
  }
}
#stop cluster
stopCluster(cl)

FYI ifelse是矢量化的。 - Sotos
请提供几行期望的输出。 - YOLO
两个问题:1. 您的每个id是否始终具有相同数量的观察值? 2. 您的函数中的所有操作都是基本数学运算吗?(加,减,乘,除,指数等),如果是,则可能可以使用向量化的矩阵运算来执行此操作。 - Matt Summersgill
数据集中有多少个不同的ID(大约)? - Val
大约有90,000个不同的ID,总共有约4百万行。 - user137698
显示剩余6条评论
3个回答

3

因此,只需使用Rcpp重新编码您的循环:

#include <Rcpp.h>
using namespace Rcpp;

// [[Rcpp::export]]
NumericVector fill_y(const NumericVector& x) {

  int n = x.length();
  NumericVector y(n); y[0] = 1;
  for (int i = 1; i < n; i++) {
    y[i] = pow(y[i - 1], 2) + x[i];
  }
  return y;
}

并使用 dplyr 将它应用于每个组:

df %>%
  group_by(ids) %>%
  mutate(y2 = fill_y(x))

我认为这个速度应该足够快,所以你不需要使用并行处理。 实际上,我在 @Val 的 testdat 上运行它只花了2秒钟(使用一台旧电脑)。

如果可以,请告诉我。否则,我将制作一个并行版本。


你提出了一个令人信服的理由,让我们更加熟悉 C++,特别是在不可避免的串行任务中!使用你的代码与 data.table 结合使用 -- df[,y := fill_y(x), by = ids] 对我来说只需要 0.35 秒!(不包括 3.4 秒的编译时间) - Matt Summersgill
加速的话给你点赞。对于那些没有使用过Rcpp的人,你应该更详细地解释如何实际使用这个函数(比如如何引用等)。 - Val
是的,你说得对,我实际上应该为每次需要解释这个问题存储一个预先编写好的句子。基本上,将其放在“.cpp”文件中,并使用RStudio进行“源代码”处理。我也很乐意接受更好的解释:') - F. Privé
谢谢您的回复!问题是循环内部的函数实际上是相当费计算的。这是一个金融程序,内部循环包含ppmt()/ipmt()函数以及矩阵乘法。总的来说,有9个新列是前一行和当前行的函数。这就是我认为需要并行运行的原因。不过我今天会尝试这个结构。 - user137698

1

基础 R 矩阵操作以及 data.table 中的 melt/dcast

如上面的评论所讨论的那样,这种解决方案非常特定于示例中的用例,但也许适用于您的用例。

使用矩阵操作和来自 data.table 包的 dcast.data.tablemelt.data.table 函数,快速地从长格式转换为宽格式,反之亦然,效率相当高。

总体而言,更大的限制很可能是可用的 RAM 而不是处理时间。

library(data.table)
set.seed(123)

id1 = rep(1,5)
id2 = rep(2,5)
id3 = rep(3,5)
ids = c(id1,id2,id3)

month = rep(seq(1,5),3)

x = round(rnorm(15,2,5))
# y = rep(0,15) ## no need to pre-define y with this method

df = as.data.frame(cbind(ids,month,x))
setDT(df) ## Convert to data.table by reference

wide <- dcast.data.table(df, month ~ ids, value.var = "x") ## pivot to 'wide' format

mat <- data.matrix(wide[,-c("month")]) ## Convert to matrix
print(mat)

给出
      1  2  3
[1,] -1 11  8
[2,]  1  4  4
[3,] 10 -4  4
[4,]  2 -1  3
[5,]  3  0 -1

然后将其视为矩阵进行操作:
mat[1,] <- 1 ## fill the first row with 1's as in your example

for (i in 2:nrow(mat)){
  mat[i,] = mat[i-1L,]^2 + mat[i,]
}

print(mat)

给出
         1      2      3
[1,]     1      1      1
[2,]     2      5      5
[3,]    14     21     29
[4,]   198    440    844
[5,] 39207 193600 712335

接下来,将数据转换为长格式,然后根据关键列idsmonth与原始数据重新连接起来。
yresult <- as.data.table(mat) ## convert back to data.table format
yresult[,month := wide[,month]] ## Add back the month column

ylong <- melt.data.table(yresult,
                         id.vars = "month",
                         variable.factor = FALSE,
                         variable.name = "ids",
                         value.name = "y") ## Pivot back to 'long' format

ylong[,ids := as.numeric(ids)] ## reclass ids to match input ids

setkey(ylong, ids, month) ## set keys for join on 'ids' and 'month'
setkey(df, ids,month)

merge(df,ylong) ## join data.table with the result

产生最终结果的代码如下:
    ids month  x      y
 1:   1     1 -1      1
 2:   1     2  1      2
 3:   1     3 10     14
 4:   1     4  2    198
 5:   1     5  3  39207
 6:   2     1 11      1
 7:   2     2  4      5
 8:   2     3 -4     21
 9:   2     4 -1    440
10:   2     5  0 193600
11:   3     1  8      1
12:   3     2  4      5
13:   3     3  4     29
14:   3     4  3    844
15:   3     5 -1 712335

规模测试

为了测试和说明缩放,下面的函数testData通过交叉连接给定数量的ID和月份数生成数据集。然后,函数testFunc执行递归的逐行矩阵操作。

testData <- function(id_count, month_count) {

  id_vector <- as.numeric(seq_len(id_count))
  months_vector <- seq_len(month_count)

  df <- CJ(ids = id_vector,month = months_vector)
  df[,x := rnorm(.N,0,0.1)]
  return(df)
}

testFunc <- function(df) {
  wide <- dcast.data.table(df,month ~ ids, value.var = "x")

  mat <- data.matrix(wide[,-c("month")])

  mat[1,] <- 1

  for (i in 2:nrow(mat)){
    mat[i,] = mat[i-1L,]^2 + mat[i,]
  }

  yresult <- as.data.table(mat)
  yresult[,month := wide[,month]]

  ylong <- melt.data.table(yresult,
                           id.vars = "month",
                           variable.factor = FALSE,
                           variable.name = "ids",
                           value.name = "y")

  ylong[,ids := as.numeric(ids)]

  setkey(ylong, ids, month)
  setkey(df, ids,month)

  merge(df,ylong)
}

有90,000个ids和45个months

foo  <- testData(90000,45)

system.time({
  testFunc(foo)
})

   user  system elapsed 
  8.186   0.013   8.201 

使用单线程,运行时间在10秒以内

有100,000个ids和1,000个months的情况:

这个三列输入数据表大约为1.9GB。

foo  <- testData(1e5,1e3)

system.time({
  testFunc(foo)
})

   user  system elapsed 
 52.790   4.046  57.031 

一个单线程的运行时间少于一分钟,这似乎相当可控,具体取决于需要运行多少次。总是可以通过改进代码或将递归部分转换为 C++ 并使用 Rcpp 来进一步加快速度,但避免在工作流程中学习 C++ 和在语言之间切换的心理负担总是很好的选择!保留html标签。

1
这是一个使用foreach的解决方案。很难说它在你的实际例子中会怎样运作,但至少它能够处理测试数据...
首先,我生成一些测试数据:
# function to generate testdata

genDat <- function(id){

  # observations per id, fixed or random
  n <- 50
  #n <- round(runif(1,5,1000))

  return(

    data.frame(id=id,month=rep(1:12,ceiling(n/12))[1:n],x=round(rnorm(n,2,5)),y=rep(0,n))

  )
}

#generate testdata

testdat <- do.call(rbind,lapply(1:90000,genDat))


> head(testdat)
  id month  x y
1  1     1  7 0
2  1     2  6 0
3  1     3 -9 0
4  1     4  3 0
5  1     5 -9 0
6  1     6  8 0


> str(testdat)
'data.frame':   4500000 obs. of  4 variables:
 $ id   : int  1 1 1 1 1 1 1 1 1 1 ...
 $ month: int  1 2 3 4 5 6 7 8 9 10 ...
 $ x    : num  7 6 -9 3 -9 8 -4 13 0 5 ...
 $ y    : num  0 0 0 0 0 0 0 0 0 0 ...

所以测试数据有大约450万行,其中有9万个唯一的ID。
现在,由于您的计算在ID之间是独立的,因此想法是将带有唯一ID的数据发送到每个核心...这最终也会摆脱对ififelse条件的必要性。
为了做到这一点,我首先生成一个具有起始和停止行索引的矩阵,以分割唯一ID的数据集:
id_len <- rle(testdat$id)

ixmat <- cbind(c(1,head(cumsum(id_len$lengths)+1,-1)),cumsum(id_len$lengths))

这个矩阵可以传递给foreach,以便并行运行特定部分。
在这个例子中,我稍微修改了你的计算方式,避免出现导致Inf的天文数字。
library(parallel)
library(doParallel)
library(iterators)

cl <- makeCluster(parallel::detectCores())
registerDoParallel(cl)   #create a cluster


r <-  foreach (i = iter(ixmat,by='row')) %dopar% {

  x <- testdat$x[i[1,1]:i[1,2]]
  y <- testdat$y[i[1,1]:i[1,2]]
  y[1] <- 1

  for(j in 2:length(y)){
    #y[j] <- (y[j-1]^2) + x[j] ##gets INF
    y[j] <- y[j-1] + x[j]
    }

  return(y)
}

parallel::stopCluster(cl)

最后,您可以替换原始数据框中的值:
testdat$y <- unlist(r)

关于时间,foreach循环在我的8核机器上大约需要40秒。

这正是我想到的框架!我正试图将您的模板修改为我的代码,因为实际上有大约9列需要在循环中运行,但我认为我可以进行调整并在今天进行测试。从概念上讲,这就是我所希望的 - 将每个ID块发送到核心。 - user137698

网页内容由stack overflow 提供, 点击上面的
可以查看英文原文,
原文链接