如何使用dplyr并行执行do()函数调用

19

我正在尝试着想办法将dplyr::do函数并行部署。在阅读了一些文档后,似乎dplyr::init_cluster()应该足以告诉do()以并行方式运行。不幸的是,当我测试时,情况似乎并非如此:

library(dplyr)
test <- data_frame(a=1:3, b=letters[c(1:2, 1)])

init_cluster()
system.time({
  test %>%
    group_by(b) %>%
    do({
      Sys.sleep(3)
      data_frame(c = rep(max(.$a), times = max(.$a)))
    })
})
stop_cluster()

输出结果如下:

Initialising 2 core cluster.
|==========================================================================|100% ~0 s remaining
   user  system elapsed 
   0.03    0.00    6.03 

如果do调用在两个核心之间分割,我期望结果是3。我可以通过在do()函数中添加一个在主R终端打印输出的语句来确认这一点。那我还缺少什么呢?

我正在使用dplyr 0.4.2和R 3.2.1。


我发现对于非常关键的代码,至少对于我的用例来说,最好的方法是使用Rcpp和OpenMP进行深入研究。这大多超出了我的计算机科学能力范围,但似乎有许多微妙的缓存交互,有时需要仔细地进行分析和基准测试。我还发现良好的数据结构往往会产生最大的差异,并且可以显着帮助并行化。祝你好运! - Jack Wasey
3个回答

26

根据@Maciej的提议,您可以尝试使用multidplyr

## Install from github
devtools::install_github("hadley/multidplyr")
使用partition()可将数据集分割到多个核心上:
library(dplyr)
library(multidplyr)
test <- data_frame(a=1:3, b=letters[c(1:2, 1)])
test1 <- partition(test, a)
你将初始化一个由3个核心组成的集群(每个 a 对应一个核心)。
# Initialising 3 core cluster.

然后只需执行你的do()调用:

test1 %>%
  do({
    dplyr::data_frame(c = rep(max(.$a)), times = max(.$a))
  })

这给出了:

#Source: party_df [3 x 3]
#Groups: a
#Shards: 3 [1--1 rows]
#
#      a     c times
#  (int) (int) (int)
#1     1     1     1
#2     2     2     2
#3     3     3     3

2
谢谢!我看了@Maciej的回答,很棒,终于有这个功能了。我经常做比summarize更复杂的任务,如果没有并行化,我真的找不到dplyr有多有用,就像许多人声称的那样。 - Max Gordon
@MaxGordon 很高兴能帮到你! - Steven Beaupré
你如何将一个用户定义的函数发送给每个节点以便使用do()执行?我收到了“未找到函数”的错误提示。 - Dominik
@Dominik,您能否发布一个带有可重现示例的新问题?我可以尝试解决它。 - Steven Beaupré
4
如果您手动创建集群(cluster),看起来可以通过parallel的clusterExport通常方式实现这一点:cluster <- create_cluster(4) ; clusterExport(cluster,c("userfun1","userfun2","userfun3"))。 - Jan Stanstrup

8

5

网页内容由stack overflow 提供, 点击上面的
可以查看英文原文,
原文链接