引导程序中的变量作用域在多集群并行方法中的应用

4
我正在尝试弄清楚如何在运行并行计算时将函数和包传递给boot()函数。在循环内部加载包或定义函数似乎非常昂贵。我经常使用的foreach()函数有一个 .packages 和 .export 参数来处理这个问题(参见这个SO 问题),但我无法找到如何在boot包中实现这一点。
下面是一个无意义的示例,展示了切换到并行计算时会发生什么:
library(boot)
myMean <- function(x) mean(x)
meaninglessTest <- function(x, i){
  return(myMean(x[i]))
}

x <- runif(1000)

bootTest <- function(){
  out <- boot(data=x, statistic=meaninglessTest, R=10000, parallel="snow", ncpus=4)
  return(boot.ci(out, type="perc"))
}

bootTest()

根据预期,抱怨无法找到myMean

附注:运行此示例时,速度比单核慢,可能因为将这个简单的任务分成多个核比实际任务更耗时。为什么默认不将任务平均分配给R/ncpus个工作批次,这个默认行为有什么原因吗?

关于附注的更新:正如Steve Weston所指出的,boot()使用的parLapply实际上将任务分成均匀的批次/块。该函数是clusterApply的一个很好的包装器:

docall(c, clusterApply(cl, splitList(x, length(cl)), lapply, 
    fun, ...))

当我增加重复次数时,我对这个表现不佳感到有些惊讶:

> library(boot)
> set.seed(10)
> x <- runif(1000)
> 
> Reps <- 10^4
> start_time <- Sys.time()
> res <- boot(data=x, statistic=function(x, i) mean(x[i]), 
+             R=Reps, parallel="no")
> Sys.time()-start_time
Time difference of 0.52335 secs
> 
> start_time <- Sys.time()
> res <- boot(data=x, statistic=function(x, i) mean(x[i]), 
+             R=Reps, parallel="snow", ncpus=4)
> Sys.time()-start_time
Time difference of 3.539357 secs
> 
> Reps <- 10^5
> start_time <- Sys.time()
> res <- boot(data=x, statistic=function(x, i) mean(x[i]), 
+             R=Reps, parallel="no")
> Sys.time()-start_time
Time difference of 5.749831 secs
> 
> start_time <- Sys.time()
> res <- boot(data=x, statistic=function(x, i) mean(x[i]), 
+             R=Reps, parallel="snow", ncpus=4)
> Sys.time()-start_time
Time difference of 23.06837 secs

我希望这只是由于非常简单的均值函数,更复杂的情况应该表现得更好。我必须承认,我觉得有点不安,因为集群初始化时间在10,000和100,000个案例中应该是相同的,然而绝对时间差异增加,4核版本需要5倍的时间。我猜这一定是列表合并的影响,因为我找不到其他解释。


boot 还允许您使用 cl 参数指定一个簇。创建一个簇并以这种方式传递它对我很有效。这对您来说是一个选项吗? - BenBarnes
1个回答

3

如果要并行执行的函数(在此例中为 meaninglessTest)有额外的依赖关系(如 myMean),标准解决方案是通过 clusterExport 函数将这些依赖项导出到集群。 这需要创建一个集群对象,并将其通过 "cl" 参数传递给 boot

library(boot)
library(parallel)
myMean <- function(x) mean(x)
meaninglessTest <- function(x, i){
  return(myMean(x[i]))
}
cl <- makePSOCKcluster(4)
clusterExport(cl, 'myMean')

x <- runif(1000)

bootTest <- function() {
  out <- boot(data=x, statistic=meaninglessTest, R=10000,
              parallel="snow", ncpus=4, cl=cl)
  return(boot.ci(out, type="perc"))
}

bootTest()
stopCluster(cl)

请注意,一旦集群工作程序初始化完成,它们可以被boot多次使用,并且不需要重新初始化,因此这并不是很昂贵。
要在集群工作程序上加载软件包,您可以使用clusterEvalQ
clusterEvalQ(cl, library(randomForest))

这很简单易懂,但对于更复杂的工人初始化,我通常会创建一个“worker init”函数并通过clusterCall执行它,这非常适合在每个工人上执行一次函数。

关于你的附注,性能表现不佳是因为统计函数做了很少的工作,正如你所说,但我不确定为什么你认为工作没有被平均分配给工人。在这种情况下,parLapply函数用于并行处理工作,并且它确实有效地平均分配和高效地完成了工作,但这并不能保证比使用lapply顺序运行更好。但也许我误解了你的问题。


很棒的答案,不如foreach直观,但它应该能够完成工作。我还没有看过boot parallel代码,但由于性能非常糟糕,我假设它为每个集群执行了一次meaninglessTest,而不是在每个集群中执行250次,然后组合输出。当我使用foreach进行模拟时,这种方法的速度更快,尽管也许那只是巧合。 - Max Gordon
@MaxGordon 我现在更好地理解你了。我所说的技术是“分块”,它可以极大地提高性能。parLapply确实执行分块,但即使是分块也有其限制。 - Steve Weston
谢谢,我看了引导代码,你关于分块的说法是正确的,我已经更新了问题并提供了相关细节。在这些过度简化的示例中,列表合并似乎相当昂贵,我有点惊讶没有更有效的处理简单合并的方法,但正如你所指出的那样 - 任何需要引导的东西都不会像mean()一样简单。 - Max Gordon

网页内容由stack overflow 提供, 点击上面的
可以查看英文原文,
原文链接