使用多核和并行编程加速data.table分组操作

21

我有一大段代码,目前速度瓶颈在于聚合步骤。

我希望加快数据分组步骤的速度。我的代码中,一个简单而非平凡的示例(SNOTE)的数据如下:

library(data.table)
a = sample(1:10000000, 50000000, replace = TRUE)
b = sample(c("3m","2m2d2m","3m2d1i3s2d","5m","4m","9m","1m"), 50000000, replace = TRUE)
d = sample(c("3m","2m2d2m","3m2d1i3s2d","5m","4m","9m","1m"), 50000000, replace = TRUE)
e = a
dt = data.table(a = a, b = b, d = d, e = e)
system.time(c.dt <- dt[,list(b = paste(b, collapse=""), d = paste(d, collapse=""), e = e[1], by=a)])
   user  system elapsed 
 60.107   3.143  63.534

这个示例的数据量很大,速度相当快,但在我的情况下,我仍在寻找进一步的加速。我有多个内核,所以我几乎可以确定必须有一种方法来利用这样的计算能力。
我愿意将我的数据类型更改为数据框或idata.frame对象(理论上,idata.frame比data.frame更快)。
我做了一些研究,似乎plyr包具有一些并行能力,可能会有所帮助,但我仍在努力学习如何对我正在尝试的分组进行操作。在另一个SO帖子中讨论了一些这些想法。我仍然不确定这种并行化能够实现多少更多的效果,因为它使用foreach函数。根据我的经验,foreach函数不适用于数百万次快速操作,因为核之间的通信工作最终会减慢并行化的效果。

我也没有给你点踩,但是以这种方式(使用字符向量)存储数据似乎会比较慢,并且连接它们只会进一步减慢速度(除非你要将其导出到其他软件中使用),因为你需要一次又一次地分解字符串进行分析。可能你应该使用一个专门的包来处理雪茄...我对这些一无所知,但在之前的问题中已经有人指引你了...https://dev59.com/YHfZa4cB1Zd3GeqPOS7T - Frank
1
我没有点踩。但我会这样做的原因是你没有提供任何关于数据的信息。如果read.index是行索引,那么将每一行分组成一个单独的行肯定会很慢。你将会调用数百万次paste。你使用了Rprof吗?你使用了verbose=TRUE吗?而且你使用了“太慢”的词汇,却没有给出具体数字。事实上,我现在已经自己说服自己要点踩了。如果你改进问题,它可以被撤销。 - Matt Dowle
@MattDowle 我刚刚更新了问题并提供了一个示例。现在我正在将我的大数据分成较小的块来读取,因此不会再过度使用RAM。虽然仍然需要帮助加快这一部分的速度,但我不确定该如何做。 - Dnaiel
2
@Dnaiel 这是一个非常好的问题。+1。我会尝试去看一下。我猜有些回答者只是关注新问题,所以为了得到更多的关注,提供悬赏可能是一个好主意。 - Matt Dowle
1
@MattDowle 非常感谢,我很高兴我解决了这样令人困惑的问题 :-) 不确定它有多好,但这是我正在处理的问题。我正在学习如何提出更好的问题,所以对我来说很好。 - Dnaiel
显示剩余8条评论
2个回答

14

你能用data.table并行聚合吗?可以。

这么做值得吗?不值得。这是前面回答没有强调的关键点。

正如Matt Dowledata.table和并行计算中所解释的那样,在并行运行操作时,需要先进行复制(“分块”),然后再进行分发。这会降低速度。在某些情况下,当你无法使用data.table(例如运行多个线性回归时),将任务分配给核心进行处理才是值得的。但是对于聚合,至少涉及到data.table时并不值得这样做。

简而言之(在证明其它情况之前),使用data.table进行聚合,别再为了潜在的速度提升而烦恼使用doMC。在聚合方面,data.table已经比任何其他可用工具都要快得多,即使没有多核支持!


以下是一些基准测试,你可以自己运行对比data.table内部使用by进行聚合和使用foreachmclapply的结果排列在前面。

#-----------------------------------------------

# TL;DR FINAL RESULTS (Best to Worst)
# 3 replications, N = 10000:
# (1)  0.007 -- data.table using `by`
# (2)  3.548 -- mclapply with rbindlist
# (3)  5.557 -- foreach with rbindlist
# (4)  5.959 -- foreach with .combine = "rbind"
# (5) 14.029 -- lapply

# ----------------------------------------------

library(data.table)

## And used the following to create the dt
N <- 1e4
set.seed(1)
a = sample(1:N, N*2, replace = TRUE)
b = sample(c("3m","2m2d2m","3m2d1i3s2d","5m","4m","9m","1m"), N*2, replace = TRUE)
d = sample(c("3m","2m2d2m","3m2d1i3s2d","5m","4m","9m","1m"), N*2, replace = TRUE)
e = a
dt = data.table(a = a, b = b, d = d, e = e, key="a")
setkey(dt, "a")

# TEST AGGREGATION WITHOUT PARALLELIZATION ---------------------------
## using data.tables `by` to aggregate
round(rowMeans(replicate(3, system.time({
    dt[,list(b = paste(b, collapse=""), d = paste(d, collapse=""), e = e[1], by=a)]
}))), 3)
# [1] 0.007 elapsed for N == 10,000, length(unique(dt[["a"]])) == 8617

## using `lapply`
round(rowMeans(replicate(3, system.time({
    results <- lapply(unique(dt[["a"]]), function(x) {
        dt[.(x), list(b = paste(b, collapse=""), d = paste(d, collapse=""), e = e[1])]
    })
    rbindlist(results)
}))), 3)
# [1] 14.029 elapsed for N == 10,000

# USING `mclapply` FORKING ---------------------------------
## use mclapply
round(rowMeans(replicate(3, system.time({
    results <- mclapply(unique(dt[["a"]]),
    function(x) {
        dt[.(x), list(b = paste(b, collapse=""), d = paste(d, collapse=""), e = e[[1]])]
    }, mc.cores=4)
    rbindlist(results)
}))), 3)
# [1] 3.548 elapsed for N == 10,000


# PARALLELIZATION USING `doMC` PACKAGE ---------------------------------
library(doMC)
mc = 4
registerDoMC(cores=mc)
getDoParWorkers()
# [1] 4

## (option a) by Ricardo Saporta
round(rowMeans(replicate(3, system.time({
    foreach(x=unique(dt[["a"]]), .combine="rbind", .inorder=FALSE) %dopar%
    dt[.(x) ,list(b = paste(b, collapse=""), d = paste(d, collapse=""), e = e[[1]])]
}))), 3)
# [1] 5.959 elapsed for N == 10,000

## (option b) by Ricardo Saporta
round(rowMeans(replicate(3, system.time({
    results <-
      foreach(x=unique(dt[["a"]])) %dopar%
        dt[.(x) ,list(b = paste(b, collapse=""), d = paste(d, collapse=""), e = e[[1]])]
    rbindlist(results)
}))), 3)
# [1] 5.557 elapsed for N == 10,000

registerDoSEQ()
getDoParWorkers()
# [1] 1

8

如果您有多个可用的核心,则可以利用数据表中使用其键快速过滤和分组行的事实:

library(doMC)
registerDoMC(cores=4)


setkey(dt, "a")

finalRowOrderMatters = FALSE # FALSE can be faster
foreach(x=unique(dt[["a"]]), .combine="rbind", .inorder=finalRowOrderMatters) %dopar% 
     dt[.(x) ,list(b = paste(b, collapse=""), d = paste(d, collapse=""), e = e[[1]])]

请注意,如果唯一组数(即length(unique(a)))相对较小,则删除.combine参数会更快,将结果返回到列表中,然后在结果上调用rbindlist。在我的双核&8GB RAM测试中,阈值约为9,000个唯一值。以下是我用来进行基准测试的内容:
# (otion a)
round(rowMeans(replicate(3, system.time({
# ------- #
  foreach(x=unique(dt[["a"]]), .combine="rbind", .inorder=FALSE) %dopar% 
     dt[.(x) ,list(b = paste(b, collapse=""), d = paste(d, collapse=""), e = e[[1]])]
# ------- #
}))), 3) 
# [1]  1.243 elapsed for N ==  1,000
# [1] 11.540 elapsed for N == 10,000, length(unique(dt[["a"]])) == 8617
# [1] 57.404 elapsed for N == 50,000



# (otion b)
round(rowMeans(replicate(3, system.time({
# ------- #
    results <- 
      foreach(x=unique(dt[["a"]])) %dopar% 
         dt[.(x) ,list(b = paste(b, collapse=""), d = paste(d, collapse=""), e = e[[1]])]
    rbindlist(results)
# ------- #
}))), 3)
# [1]  1.117 elapsed for N ==  1,000
# [1] 10.567 elapsed for N == 10,000, length(unique(dt[["a"]])) == 8617
# [1] 76.613 elapsed for N == 50,000


## And used the following to create the dt
N <- 5e4
set.seed(1)
a = sample(1:N, N*2, replace = TRUE)
b = sample(c("3m","2m2d2m","3m2d1i3s2d","5m","4m","9m","1m"), N*2, replace = TRUE)
d = sample(c("3m","2m2d2m","3m2d1i3s2d","5m","4m","9m","1m"), N*2, replace = TRUE)
e = a
dt = data.table(a = a, b = b, d = d, e = e, key="a")

4
每个子进程是否需要复制整个数据表,还是它们都访问“主”数据表对象? - Zach

网页内容由stack overflow 提供, 点击上面的
可以查看英文原文,
原文链接