R、dplyr和snow:如何并行化使用dplyr的函数

4

假设我想以并行方式将myfunction应用于myDataFrame的每一行。假设otherDataFrame是一个具有两列COLUNM1_odfCOLUMN2_odf的数据框,用于myfunction中的某些原因。因此,我想使用parApply编写以下代码:

clus <- makeCluster(4)
clusterExport(clus, list("myfunction","%>%"))

myfunction <- function(fst, snd) {
 #otherFunction and aGlobalDataFrame are defined in the global env
 otherFunction(aGlobalDataFrame)

 # some code to create otherDataFrame **INTERNALLY** to this function
 otherDataFrame %>% filter(COLUMN1_odf==fst & COLUMN2_odf==snd)
 return(otherDataFrame)
}
do.call(bind_rows,parApply(clus,myDataFrame,1,function(r) { myfunction(r[1],r[2]) }

问题在于,即使我将 COLUMN1_odfCOLUMN2_odf 插入到 clusterExport 中,R 仍然无法识别它们。我该如何解决这个问题?是否有一种方法可以“导出”所有 snow 需要的对象,以避免枚举每个对象?
编辑1:我已经添加了一条注释(在上面的代码中),以便指定 myfunction 内部创建了 otherDataFrame
编辑2:为了让 myfunction 更通用,我添加了一些伪代码:现在它使用一个全局数据框 (aGlobalDataFrame) 和另一个函数 otherFunction

1
你的 myFunction 函数的参数应该包含所有对象。尝试使用 myFunction <- function(otherDataFrame, fst, snd) {... - Roman Luštrik
实际上,otherDataFrame是在我的函数中创建的对象,因此我无法将其传递给它。 - enneppi
不必使用parApply,可以使用parLapplydo.call(bind_rows,parLapply(clus,1:nrow (myDataFrame),function(i, r) { myfunction(r[i,1],r[i,2]) }。(我还没有测试过这个。它可能仍然需要一些调整) - Benjamin
我的观点仍然成立 - 在不从外部引用它们的情况下将所有必需的对象传递给函数。一旦您生成额外的R并行进程,外部就为空。 - Roman Luštrik
但是我创建otherDataFrame的函数内部:不可避免地,我无法将其传递给该函数。 otherDataFrame是在函数内部使用传递给函数本身的参数创建的对象。 - enneppi
@Benjamin 我会尽快检查你的建议。但是为什么parLapply应该起作用呢?在这种特殊情况下,parLapply有什么不同之处? - enneppi
2个回答

5

我做了一些实验,最终解决了我的问题(在 Benjamin 的建议和我在问题中添加的“编辑”考虑下):

clus <- makeCluster(4)
clusterEvalQ(clus, {library(dplyr); library(magrittr)})
clusterExport(clus, "myfunction", "otherfunction", aGlobalDataFrame)

myfunction <- function(fst, snd) {
 #otherFunction and aGlobalDataFrame are defined in the global env
 otherFunction(aGlobalDataFrame)

 # some code to create otherDataFrame **INTERNALLY** to this function
 otherDataFrame %>% dplyr::filter(COLUMN1_odf==fst & COLUMN2_odf==snd)
 return(otherDataFrame)
}

do.call(bind_rows, parApply(clus, myDataFrame, 1, 
        {function(r) { myfunction(r[1], r[2]) } )

这样,我已经注册了aGlobalDataFramemyfunctionotherfunction,简而言之,所有用于并行化作业(myfunction本身)的函数和数据都已经被注册。


1
现在我不是在手机上看这个东西,我发现了一些问题。
首先,在你的函数中实际上没有创建otherDataFrame。你试图将现有的otherDataFrame导入管道,如果环境中不存在otherDataFrame,则函数将失败。
其次,除非你已经将dplyr包加载到你的集群环境中,否则你将调用错误的filter函数。
最后,当你调用parApply时,你没有指定fst和应该是什么。请尝试以下内容:
clus <- makeCluster(4)
clusterEvalQ(clus, {library(dplyr); library(magrittr)})
clusterExport(clus, "myfunction")

myfunction <- function(otherDataFrame, fst, snd) {
 dplyr::filter(otherDataFrame, COLUMN1_odf==fst & COLUMN2_odf==snd)
}
do.call(bind_rows,parApply(clus,myDataFrame,1,function(r, fst, snd) { myfunction(r[fst],r[snd]), "[fst]", "[snd]") }

我会尽快尝试...虽然otherDataFrame是在函数内部创建的(请阅读我的最后一次编辑),但你的建议是一个好的跟随方式。 - enneppi

网页内容由stack overflow 提供, 点击上面的
可以查看英文原文,
原文链接