如何在多个 R 作业并行运行完成后运行另一个 R 脚本?

3
我需要运行脚本的步骤是,首先使用rstudioapi::jobRunScript()函数并行运行4个R脚本。每个在并行运行的脚本不从任何环境中导入内容,而是将创建的数据框导出到全局环境中。我的第5个R脚本建立在由并行运行的4个R脚本创建的数据框的基础上,同时这第5个脚本在控制台中运行。如果有方法可以在前4个R脚本并行运行完成后在后台运行第5个脚本,那将会更好。我还试图缩短整个过程的总运行时间。
虽然我已经知道如何并行运行前4个R脚本,但我的任务尚未完成,因为我找不到方法来触发第5个R脚本的运行。希望你们可以帮我解决这个问题。

2
我认为使用jobRunScript运行的作业不会影响(例如存储结果在)调用环境。前四个作业是否将它们的结果保存在外部(例如文件或数据库)? - r2evans
什么是 jobRunScript()?哦,我明白了,它是 rstudioapi::jobRunScript() - HenrikB
嗨@r2evans,结果只是漂浮在全局环境中。这是我的做法:'jobRunScript("C:\Desktop\CY alloc\Scripts\Prep 1.R", importEnv=FALSE, exportEnv="R_GlobalEnv")''jobRunScript("C:\Desktop\CY alloc\Scripts\Prep 2.R", importEnv=FALSE, exportEnv="R_GlobalEnv")' - Denelle
3个回答

6

我认为这个方法有些过于宽松。虽然rstudioapi可以用于运行并行任务,但它不太灵活并且输出结果也不是非常有用。在R中,使用几个包实现了比较好的parallel函数库,并提供了更加简单和良好的接口。以下是三个选项,这些选项还允许从不同文件中“输出”内容。

package = parallel

使用parallel包,我们可以非常简单地实现此功能。只需创建一个要被源化的文件向量,并在每个线程中执行source命令即可。主进程将在运行时锁定,但如果您必须等待它们完成,这并不会对执行造成太大影响。

library(parallel)
ncpu <- detectCores()
cl <- makeCluster(ncpu)
# full path to file that should execute 
files <- c(...) 
# use an lapply in parallel.
result <- parLapply(cl, files, source)
# Remember to close the cluster
stopCluster(cl)
# If anything is returned this can now be used.

作为一则注记,有几个包与 "parallel" 包具有类似的接口,这些包是基于 "snow" 包构建的,因此了解 "parallel" 包是一个很好的基础。
包 = foreach
除了 "parallel" 包外,另一个选择是 "foreach" 包,它提供了类似于“for 循环”接口的东西,简化了接口,同时提供了更多的灵活性并自动导入必要的库和变量(尽管手动执行此操作更安全)。然而,“foreach” 包依赖于 “parallel” 和 “doParallel” 包来设置集群。
library(parallel)
library(doParallel)
library(foreach)
ncpu <- detectCores()
cl <- makeCluster(ncpu)
files <- c(...) 
registerDoParallel(cl)
# Run parallel using foreach
# remember %dopar% for parallel. %do% for sequential.
result <- foreach(file = files, .combine = list, .multicombine = TRUE) %dopar% { 
  source(file)
  # Add any code before or after source.
}
# Stop cluster
stopCluster(cl)
# Do more stuff. Result holds any result returned by foreach.

虽然使用.combine, .packages.export会增加一些代码行数,但在R中使用并行计算时,这些函数提供了非常简单的接口。

包= future

future是一种更灵活的并行接口,比parallelforeach都要好。它允许进行异步并行编程。实现可能看起来有点令人畏惧,而我提供的示例仅是可能性的冰山一角。
值得一提的是,虽然future软件包确实提供了导入运行所需的函数和包的自动功能,但经验告诉我,这仅限于调用的任何第一层深度(有时更少),因此仍然需要导出。
foreach依赖于parallel(或类似)来启动群集,foreach将自行使用所有可用的内核启动一个群集。只需调用plan(multiprocess)即可开始多核会话。

library(future)
files <- c(...) 
# Start multiprocess session
plan(multiprocess)
# Simple wrapper function, so we can iterate over the files variable easier
source_future <- function(file)
  future(file)
results <- lapply(files, source_future)
# Do some calculations in the meantime
print('hello world, I am running while waiting for the futures to finish')
# Force waiting for the futures to finish
resolve(results)
# Extract any result from the futures
results <- values(results)
# Clean up the process (close down clusters)
plan(sequential)
# Run some more code.

现在看起来可能有些繁琐,但是总的机制是:

  1. 调用 plan(multiprocess)
  2. 使用 future(或者 %<-%code>,这里不再赘述)执行一些函数
  3. 如果还有其他不依赖于进程的代码需要运行,则执行该代码
  4. 使用 resolve 等待单个未来或多个未来列表(或环境)的结果
  5. 使用 value 获取单个未来的结果,或者使用 values 获取多个未来列表(或环境)的结果
  6. 通过使用 plan(sequential) 清理未来环境中的任何集群
  7. 继续运行依赖于您的未来结果的代码。

我相信这3个软件包提供了与多处理程序(至少在CPU上)相关的所有必要接口,任何用户都需要与之交互。其他软件包提供替代接口,而对于异步操作,我只知道 futurepromises。总的来说,我建议大多数用户在转向异步编程时非常小心,因为这可能会导致整套问题,其发生频率比同步并行编程低。

我希望这可以提供一种替代(非常受限制的)rstudioapi接口,我相当确定该接口从未被设计成由用户自己用于并行编程,而更可能是通过接口本身执行诸如并行构建软件包之类的任务。


1
关于“package = future: ...自动导入运行代码所需的函数和包,经验告诉我这仅限于任何调用的第一层深度(有时更少),因此仍然需要导出。”请考虑在未来问题跟踪器上发布可重现的示例,因为目标是尽可能涵盖多种用例(模块棘手的边角情况)。 - HenrikB
1
@HenrikB,我会记住的。在我的工作中已经出现了几次,但目前手头没有可重现的例子。谁知道它们可能已经在此期间被修复了。 - Oliver
嗨,Oliver,谢谢你。关于包=parallel,我需要在每个并行运行的脚本末尾使用return函数,以便可以通过“result”对象捕获生成的数据框吗? - Denelle
嗨@Danelle。不,这并不是必要的。如果没有返回值,则默认情况下返回NULL。如果您也没有任何要返回的内容,则可以删除result <-。对于所有三个解决方案都适用。 :-) - Oliver
1
明白了,@Oliver!并行包对我很有用,而且在我的当前代码结构中实现起来最直接和最容易。感谢您的回复! - Denelle

2
您可以使用promises结合future来完成:使用promises::promise_all后跟promises::then可以在启动最后一个作为后台进程之前等待先前futures的完成。
当作业在后台运行时,您仍然可以控制控制台。
library(future)
library(promises)

plan(multisession)
# Job1
fJob1<- future({
  # Simulate a short duration job
  Sys.sleep(3)
  cat('Job1 done \n')
})

# Job2
fJob2<- future({
  # Simulate a medium duration job
  Sys.sleep(6)
  cat('Job2 done \n')
})


# Job3
fJob3<- future({
  # Simulate a long duration job
  Sys.sleep(10)
  cat('Job3 done \n')
})

# last Job
runLastJob <- function(res) {
  cat('Last job launched \n')
  # launch here script for last job
}

# Cancel last Job
cancelLastJob <- function(res) {
  cat('Last job not launched \n')
}

#  Wait for all jobs to be completed and launch last job
p_wait_all <- promises::promise_all(fJob1, fJob2, fJob3 )
promises::then(p_wait_all,  onFulfilled = runLastJob, onRejected = cancelLastJob)

Job1 done 
Job2 done 
Job3 done 
Last job launched 

0

我不知道这对你当前的情况有多大的适应性,但是这里有一种方法可以让四个任务并行运行,获取它们的返回值,然后触发第五个表达式/函数。

前提是使用callr::r_bg来运行单个文件。实际上,这会运行一个function,而不是一个文件,因此我将修改这些文件的外观期望一点点

我将编写一个辅助脚本,旨在模仿你的四个脚本之一。我猜你也想能够正常地源化它(直接运行它而不是作为一个函数),所以我将生成脚本文件,使它“知道”它是被源化还是直接运行(基于Rscript detect if R script is being called/sourced from another script)。 (如果你了解python,这类似于python的if __name__ == "__main__"技巧。)

辅助脚本名为somescript.R

somefunc <- function(seconds) {
  # put the contents of a script file in this function, and have
  # it return() the data you need back in the calling environment
  Sys.sleep(seconds)
  return(mtcars[sample(nrow(mtcars),2),1:3])
}

if (sys.nframe() == 0L) {
  # if we're here, the script is being Rscript'd, not source'd
  somefunc(3)
}

作为演示,如果在控制台上执行source,这仅仅定义了函数(或多个函数,如果你需要的话),它并不会执行最后一个if块中的代码。
system.time(source("~/StackOverflow/14182669/somescript.R"))
#                                   # <--- no output, it did not return a sample from mtcars
#    user  system elapsed 
#       0       0       0           # <--- no time passed

但如果我在终端中使用 Rscript 运行这个程序,

$ time /c/R/R-4.0.2/bin/x64/Rscript somescript.R
               mpg cyl  disp
Merc 280C     17.8   6 167.6
Mazda RX4 Wag 21.0   6 160.0

real    0m3.394s                    # <--- 3 second sleep
user    0m0.000s
sys     0m0.015s

回到前提。不要使用四个“脚本”,而是像我上面的somescript.R一样重写您的脚本文件。如果正确完成,它们可以像Rscript一样运行,也可以用不同的意图进行source
我将使用这个脚本四次,而不是四个脚本。以下是我们想要自动化的手动运行过程:
# library(callr)
tasks <- list(
  callr::r_bg(somefunc, args = list(5)),
  callr::r_bg(somefunc, args = list(1)),
  callr::r_bg(somefunc, args = list(10)),
  callr::r_bg(somefunc, args = list(3))
)
sapply(tasks, function(tk) tk$is_alive())
# [1]  TRUE FALSE  TRUE FALSE
### time passes
sapply(tasks, function(tk) tk$is_alive())
# [1] FALSE FALSE  TRUE FALSE
sapply(tasks, function(tk) tk$is_alive())
# [1] FALSE FALSE FALSE FALSE

tasks[[1]]$get_result()
#                    mpg cyl  disp  hp drat    wt  qsec vs am gear carb
# Merc 280          19.2   6 167.6 123 3.92 3.440 18.30  1  0    4    4
# Chrysler Imperial 14.7   8 440.0 230 3.23 5.345 17.42  0  0    3    4

我们可以用编程自动化实现这个。

source("somescript.R")
message(Sys.time(), " starting")
# 2020-08-28 07:45:31 starting
tasks <- list(
  callr::r_bg(somefunc, args = list(5)),
  callr::r_bg(somefunc, args = list(1)),
  callr::r_bg(somefunc, args = list(10)),
  callr::r_bg(somefunc, args = list(3))
)
# some reasonable time-between-checks
while (any(sapply(tasks, function(tk) tk$is_alive()))) {
  message(Sys.time(), " still waiting")
  Sys.sleep(1)                      # <-- over to you for a reasonable poll interval
}
# 2020-08-28 07:45:32 still waiting
# 2020-08-28 07:45:33 still waiting
# 2020-08-28 07:45:34 still waiting
# 2020-08-28 07:45:35 still waiting
# 2020-08-28 07:45:36 still waiting
# 2020-08-28 07:45:37 still waiting
# 2020-08-28 07:45:38 still waiting
# 2020-08-28 07:45:39 still waiting
# 2020-08-28 07:45:40 still waiting
# 2020-08-28 07:45:41 still waiting
message(Sys.time(), " done!")
# 2020-08-28 07:45:43 done!
results <- lapply(tasks, function(tk) tk$get_result())
str(results)
# List of 4
#  $ :'data.frame': 2 obs. of  3 variables:
#   ..$ mpg : num [1:2] 24.4 32.4
#   ..$ cyl : num [1:2] 4 4
#   ..$ disp: num [1:2] 146.7 78.7
#  $ :'data.frame': 2 obs. of  3 variables:
#   ..$ mpg : num [1:2] 30.4 14.3
#   ..$ cyl : num [1:2] 4 8
#   ..$ disp: num [1:2] 95.1 360
#  $ :'data.frame': 2 obs. of  3 variables:
#   ..$ mpg : num [1:2] 15.2 15.8
#   ..$ cyl : num [1:2] 8 8
#   ..$ disp: num [1:2] 276 351
#  $ :'data.frame': 2 obs. of  3 variables:
#   ..$ mpg : num [1:2] 14.3 15.2
#   ..$ cyl : num [1:2] 8 8
#   ..$ disp: num [1:2] 360 304

现在运行你的第五个函数/脚本。


网页内容由stack overflow 提供, 点击上面的
可以查看英文原文,
原文链接