使用doMC替代并行plyr

9
考虑对数据框执行标准的分组操作:
library(plyr)
library(doMC)
library(MASS) # for example

nc <- 12
registerDoMC(nc)

d <- data.frame(x = c("data", "more data"), g = c("group1", "group2"))
y <- "some global object"

res <- ddply(d, .(g), function(d_group) {
   # slow, complicated operations on d_group
}, .parallel = FALSE)

只需简单地将.parallel=TRUE写入代码中,就可以轻松利用多核设置。这是plyr中我最喜欢的功能之一。

但由于plyr已被弃用(我想),并且基本上被dplyr、purrr等替代,因此并行处理的解决方案变得更加冗长:

library(dplyr)
library(multidplyr)
library(parallel)
library(MASS) # for example

nc <- 12

d <- tibble(x = c("data", "more data"), g = c("group1", "group2"))
y <- "some global object"

cl <- create_cluster(nc)
set_default_cluster(cl)
cluster_library(cl, packages = c("MASS"))
cluster_copy(cl, obj = y)

d_parts <- d %>% partition(g, cluster = cl)
res <- d_parts %>% collect() %>% ungroup()

rm(d_parts)
rm(cl)

您可以想象,在循环中需要每个包和对象时,需要使用自己的cluster_*命令将其复制到节点上,因此这个示例可能会变得非常长。未并行化的plyr-to-dplyr转换只是一个简单的dplyr::group_by构造,不幸的是,没有一种简洁的方法来启用并行处理。因此,我的问题是:
  • 这实际上是从plyr转换到dplyr的首选方式吗?
  • 在plyr背后发生了什么神奇的事情,使得轻松开启并行处理成为可能?是否有任何原因导致将此功能添加到dplyr特别困难,因此它还不存在?
  • 我的两个示例在代码执行方面基本上有什么不同?

3
关于你的第三个问题:我的答案是肯定的。你的“plyr”示例使用的是“doMC”,它是用于“foreach”的多核后端,也就是说:分叉。而你的“multidplyr”示例使用的是“create_cluster”,默认使用的是“parallel::makePSOCKcluster”,也就是:并行 SOCKet 集群 - Aurèle
1
关于您的第二个问题:如果没有预先设置群集,只是调用partition(),会发生与之前相同的魔法:plyr依赖于以前注册的foreach后端(print(plyr ::: setup_parallel)),没有群集的multidplyr :: partition()隐式地依赖于create_cluster(),但可能会检测到另一个已经注册的后端(尽管我没有检查,请参见print(multidplyr ::: cluster_exists))multidplyr手册的第一个示例说明了这种仅需调用partition()而无需进行先前设置的能力。 - Aurèle
1
关于你的第一个问题:据我所知,从文档和我的实验来看,multidplyr 不像 plyr 那样允许分叉,只能使用 PSOCK - Aurèle
1个回答

3
  1. 我认为没有一种真正的“首选”方法将 {plyr} 代码转换为 {dplyr}。

  2. 在评论中,@Aurèle比我更好地描述了{plyr}和{doMC}之间的联系。发生的一件事是激励机制有些变化。{doMC}来自Revolution Analytics(后被Microsoft收购)。但是,开发dplyr的Hadley目前在RStudio工作。这两家公司在IDE领域竞争。因此,他们的软件包可能不太容易协同工作。我在RStudio看到的唯一形式的并行性的强大支持是{sparklyr},他们相对“容易”设置。但是,我不能真正推荐使用Spark来进行单台机器的并行处理。

  3. @Aurèle再次很好地解释了执行差异。您的新代码使用PSOCK集群,而旧代码使用forks。Forks使用写时复制模式访问RAM,因此并行进程可以立即在fork后访问相同的数据。PSOCK群集就像生成R的新副本-它们必须加载库并接收数据的显式副本。

您可以使用类似于...的模式。

library(dplyr)
library(purrr)
library(future)
plan(multicore)
options(mc.cores = availableCores())
d <- data.frame(x = 1:8, g = c("group1", "group2", "group3", "group4"))
y <- "some global object"


split(d, d$g) %>% 
  map(~ future({Sys.sleep(5);mean(.x$x)})) %>% 
  map_df(~value(.x))

在使用map_df步骤进行一些并行处理时,需要一些技巧。请注意,在{purrr}下,~是匿名函数语法,其中.x是映射的值。

如果你喜欢冒险,也许可以通过在{purrr}中使用一个私有方法来创建类似的东西的版本,而不使用{future}。

mcmap <- function(.x, .f, ...) {
  .f <- as_mapper(.f, ...)
  mclapply(.x, function(.x) {
    force(.f)
    .Call(purrr:::map_impl, environment(), ".x", ".f", "list")
  }) %>%
    map(~ .x[[1]])
}

谢谢解释。我还没有尝试过这段代码,但purrr+future可能是一个不错的解决方案。 - Devin

网页内容由stack overflow 提供, 点击上面的
可以查看英文原文,
原文链接