R中的并行计算:如何使用处理器核心

3
我正在尝试在R中进行并行计算。我想训练一个逻辑岭模型,并且我的电脑有4个核心。我希望将我的数据集平均分成4份,使用每个核心训练模型(在训练数据上),并将每个核心的结果保存到单个向量中。问题是我不知道该如何做,目前我尝试使用foreach包来并行处理,但问题是每个核心都会看到相同的训练数据。以下是使用foreach包的代码(它不会拆分数据):
library(ridge)
library(parallel)
library(foreach)

num_of_cores <- detectCores()
mydata <- read.csv("http://www.ats.ucla.edu/stat/data/binary.csv")
data_per_core <- floor(nrow(mydata)/num_of_cores)
result <- data.frame()

r <- foreach(icount(4), .combine = cbind) %dopar% {
      result <- logisticRidge(admit~ gre + gpa + rank,data = mydata)
      coefficients(result)
}

有没有想过如何将数据同时分成x个部分并并行训练模型?

你是否只局限于使用 parallelforeach?或者你是否可以接受 snowfall 解决方案? - David
2个回答

3
< p > itertools 包提供了许多函数,用于使用 foreach 循环迭代各种数据结构。在本例中,您可以使用 isplitRows 函数将数据帧按行拆分为每个工作进程一个 chunk

library(ridge)
library(doParallel)
library(itertools)

num_of_cores <- detectCores()
cl <- makePSOCKcluster(num_of_cores)
registerDoParallel(cl)
mydata <- read.csv("http://www.ats.ucla.edu/stat/data/binary.csv")

r <- foreach(d=isplitRows(mydata, chunks=num_of_cores),
             .combine = cbind, .packages="ridge") %dopar% {
  result <- logisticRidge(admit~ gre + gpa + rank, data = d)
  coefficients(result)
}

isplitRows还接受一个chunkSize参数,如果您想控制每个块的最大大小。

请注意,使用此技术,每个工作进程仅接收适当比例的mydata。这对于具有PSOCK集群的较大数据框架尤为重要。


Steve,非常感谢你,你的代码让它变得简单! - navri
1
谢谢!这很有帮助!foreach嵌套小册子(https://cran.r-project.org/web/packages/foreach/vignettes/nested.pdf)仍然提到`doNWS`作为唯一的分块方式,但是该软件包似乎已经不存在了。也许你可以在下一个`foreach`版本中修订这个小册子? - Mekki MacAulay

3
像这样的东西怎么样?它使用了 snowfall 而不是 foreach 库,但应该能够得到相同的结果。
library(snowfall)
library(ridge)

# for reproducability
set.seed(123)
num_of_cores <- parallel::detectCores()
mydata <- read.csv("http://www.ats.ucla.edu/stat/data/binary.csv")
data_per_core <- floor(nrow(mydata)/num_of_cores)

# we take random rows to each cluster, by sampleid
mydata$sampleid <- sample(1:num_of_cores, nrow(mydata), replace = T)

# create a small function that calculates the coefficients
regfun <- function(dat) {
  library(ridge) # this has to be in the function, otherwise snowfall doesnt know the logistic ridge function
  result <- logisticRidge(admit~ gre + gpa + rank, data = dat)
  coefs <- as.numeric(coefficients(result))
  return(coefs)
}

# prepare the data
datlist <- lapply(1:num_of_cores, function(i){
  dat <- mydata[mydata$sampleid == i, ]
})

# initiate the clusters
sfInit(parallel = T, cpus = num_of_cores)

# export the function and the data to the cluster
sfExport("regfun")

# calculate, (sfClusterApply is very similar to sapply)
res <- sfClusterApply(datlist, function(datlist.element) {
  regfun(dat = datlist.element)
})

#stop the cluster
sfStop()

# convert the list to a data.frame. data.table::rbindlist(list(res)) does the same job
res <- data.frame(t(matrix(unlist(res), ncol = num_of_cores)))
names(res) <- c("intercept", "gre", "gpa", "rank")
res
# res
# intercept          gre
# 1 -3.002592 1.558363e-03
# 2 -4.142939 1.060692e-03
# 3 -2.967130 2.315487e-03
# 4 -1.176943 4.786894e-05
# gpa         rank
# 1  0.7048146997 -0.382462408
# 2  0.9978841880 -0.314589628
# 3  0.6797382218 -0.464219036
# 4 -0.0004576679 -0.007618317

谢谢您的回答!不过我正在尝试模拟数据分布在不同机器上的情况,因此我想问一下是否可能只将部分数据导出到每个集群,而不是整个数据集?另一个问题是,是否有人知道如何使用parallel包实现解决方案(我不限于foreach,但不能使用snowfall)? - navri
我编辑了答案,现在不要将整个数据集发送到每个从节点,而只发送子集。但仍然是一个snowfall解决方案。请问为什么您不能使用snowfall? - David
现在运行你的示例时,我会遇到错误,因为虽然你没有将mydata发送到工作线程,但你仍在工作函数中引用它。 - Steve Weston
你说得对,我可能忘记检查这个函数了,现在已经修正了!对此给您带来的不便深感抱歉! - David
首先,非常感谢 David 的帮助!我误以为 Snowfall 不能在 OSX 上运行,所以实际上我可以使用你的代码。再次感谢你的帮助。 - navri

网页内容由stack overflow 提供, 点击上面的
可以查看英文原文,
原文链接