有没有办法追踪mclapply的进度?

48

我很喜欢 plyrllply 中的设置 .progress = 'text'。但是,当使用来自multicore包的 mclapply 时,它会引起我的焦虑,因为列表项被发送到各个核心然后在最后汇总。

我一直在输出像 *当前正在进行 sim_id #....* 这样的消息,但这并不太有用,因为它没有给我一个指示已完成列表项的百分比(尽管知道脚本未卡住并继续运行是有帮助的)。

有人能建议其他想法,让我可以查看我的 .Rout 文件并了解进度吗?我考虑过添加手动计数器,但无法实现,因为必须在mclapply处理完所有列表项之前才能提供反馈。


1
请查看我对类似问题的回答:https://dev59.com/u2035IYBdhLWcg3wbPfc#5431265 - otsaw
您IP地址为143.198.54.68,由于运营成本限制,当前对于免费用户的使用频率限制为每个IP每72小时10次对话,如需解除限制,请点击左下角设置图标按钮(手机用户先点击左上角菜单按钮)。 - codeola
很好的问题,package multicore不再可用,有没有不使用multicore包的解决方法? - forecaster
1
@forecaster:是的,请看一下parallel包。 - fotNelton
我认为你可以根据自己的情况来适应以下内容:https://dev59.com/WOrzs4cB2Jgan1znPN_B#73940644。 - Mihai
如果您不介意使用parallel::parSapply,可以看一下我编写的一个包,用于跟踪并行执行的任务进度。您可以在这里找到它 - Mihai
6个回答

26

由于 mclapply 会生成多个进程,因此有些人可能希望使用fifo、管道甚至套接字。现在考虑以下示例:

library(multicore)

finalResult <- local({
    f <- fifo(tempfile(), open="w+b", blocking=T)
    if (inherits(fork(), "masterProcess")) {
        # Child
        progress <- 0.0
        while (progress < 1 && !isIncomplete(f)) {
            msg <- readBin(f, "double")
            progress <- progress + as.numeric(msg)
            cat(sprintf("Progress: %.2f%%\n", progress * 100))
        } 
        exit()
    }
    numJobs <- 100
    result <- mclapply(1:numJobs, function(...) {
        # Dome something fancy here
        # ...
        # Send some progress update
        writeBin(1/numJobs, f)
        # Some arbitrary result
        sample(1000, 1)
    })
    close(f)
    result
})

cat("Done\n")

在这里,临时文件被用作先进先出(FIFO)队列,主进程fork了一个子进程,它的唯一职责是报告当前进度。主进程继续调用mclapply,其中要评估的表达式(更准确地说,是表达式块)通过writeBin将部分进度信息写入fifo缓冲区。

由于这只是一个简单的例子,您可能需要根据自己的需求调整整个输出内容。希望对你有所帮助!


这与使用标准函数messagesink有什么不同吗?所有子进程的消息都会立即发送到同一个接收器,对吗? - otsaw
3
如果使用 mclapply,主进程会等待所有子进程完成操作,因此如果不再派生一个子进程,就无法在 mclapply 工作时接收和处理消息。为了能够接收和处理消息,需要再派生一个子进程。 - fotNelton
根据我的经验,子进程似乎会立即将stdout和stderr发送到与父进程相同的位置。但也许这取决于操作系统? - otsaw
@otsaw:你所说的关于stdout和stderr的内容是正确的,但由于主进程在等待mclapply完成时被阻塞,因此需要另一个进程或线程来处理进度输出。 - fotNelton
1
你好 forNelton,这是一个非常有用的答案,包multicore在CRAN中不再可用。是否有一种方法可以避免使用multicore包?谢谢。 - forecaster
@forecaster:是的,请看一下parallel包。 - fotNelton

16

本质上是添加了另一个版本的@fotNelson的解决方案,但做了一些修改:

  • 支持所有mclapply函数的快速替换
  • 捕获ctrl-c调用并优雅地中止
  • 使用内置进度条(txtProgressBar)
  • 选择是否跟踪进度、使用指定样式的进度条选项
  • 使用parallel而不是已从CRAN中删除的multicore
  • 按照mclapply将X强制转换为列表(因此length(X)可以得到预期结果)
  • 顶部采用roxygen2风格的文档

希望对其他人有所帮助...

library(parallel)

#-------------------------------------------------------------------------------
#' Wrapper around mclapply to track progress
#' 
#' Based on https://dev59.com/-Ggu5IYBdhLWcg3w6bKo
#' 
#' @param X         a vector (atomic or list) or an expressions vector. Other
#'                  objects (including classed objects) will be coerced by
#'                  ‘as.list’
#' @param FUN       the function to be applied to
#' @param ...       optional arguments to ‘FUN’
#' @param mc.preschedule see mclapply
#' @param mc.set.seed see mclapply
#' @param mc.silent see mclapply
#' @param mc.cores see mclapply
#' @param mc.cleanup see mclapply
#' @param mc.allow.recursive see mclapply
#' @param mc.progress track progress?
#' @param mc.style    style of progress bar (see txtProgressBar)
#'
#' @examples
#' x <- mclapply2(1:1000, function(i, y) Sys.sleep(0.01))
#' x <- mclapply2(1:3, function(i, y) Sys.sleep(1), mc.cores=1)
#' 
#' dat <- lapply(1:10, function(x) rnorm(100)) 
#' func <- function(x, arg1) mean(x)/arg1 
#' mclapply2(dat, func, arg1=10, mc.cores=2)
#-------------------------------------------------------------------------------
mclapply2 <- function(X, FUN, ..., 
    mc.preschedule = TRUE, mc.set.seed = TRUE,
    mc.silent = FALSE, mc.cores = getOption("mc.cores", 2L),
    mc.cleanup = TRUE, mc.allow.recursive = TRUE,
    mc.progress=TRUE, mc.style=3) 
{
    if (!is.vector(X) || is.object(X)) X <- as.list(X)

    if (mc.progress) {
        f <- fifo(tempfile(), open="w+b", blocking=T)
        p <- parallel:::mcfork()
        pb <- txtProgressBar(0, length(X), style=mc.style)
        setTxtProgressBar(pb, 0) 
        progress <- 0
        if (inherits(p, "masterProcess")) {
            while (progress < length(X)) {
                readBin(f, "double")
                progress <- progress + 1
                setTxtProgressBar(pb, progress) 
            }
            cat("\n")
            parallel:::mcexit()
        }
    }
    tryCatch({
        result <- mclapply(X, ..., function(...) {
                res <- FUN(...)
                if (mc.progress) writeBin(1, f)
                res
            }, 
            mc.preschedule = mc.preschedule, mc.set.seed = mc.set.seed,
            mc.silent = mc.silent, mc.cores = mc.cores,
            mc.cleanup = mc.cleanup, mc.allow.recursive = mc.allow.recursive
        )

    }, finally = {
        if (mc.progress) close(f)
    })
    result
}

这个版本实际上并没有显示任务的进度。进度条从0%开始,一直保持在那里。 - Ariel
这个函数在我的OS X和Linux上运行良好,所以可能是一个Windows的问题。 - waferthin
1
在Windows下不支持fork,所以这些都不应该工作,包括mclapply本身(使用多个核心)。 - codeola
我使用的是OSX和Linux系统。当你只指定一个核心时,mclapply会显示警告和错误信息。如果使用多个核心,它就无法正常工作。请问其他使用OSX或Linux系统的用户能否确认这在他们的系统上是否可行?我是通过RStudio运行的。 - Ariel
1
看起来在Rstudio中parallel:::mcfork的工作不如预期。这个问题超出了我的能力范围,最好在stackoverflow上提出一个单独的问题来处理。如果我得到解决方案,我会在这里发布... - waferthin
3
这种跟踪进度的方法无法在Rstudio中使用(请参见此处的讨论:https://dev59.com/WF4d5IYBdhLWcg3wAOZw),因为分叉进程的输出(它将进度打印到屏幕上)在Rstudio中会被忽略。 - waferthin

13

pbapply包已经为通用情况实现了这一点(即在类Unix和Windows上,也适用于RStudio)。pblapplypbsapply都有一个cl参数。 根据文档

可以通过cl参数启用并行处理。当cl是“集群”对象时调用parLapply,当cl是整数时调用mclapply。 显示进度条会增加主进程和节点/子进程之间的通信开销,相对于没有进度条的函数的并行等效函数。 当禁用进度条时(即getOption("pboptions")$type == "none" dopb()FALSE),函数将回退到其原始等效函数。这是当interactive()FALSE(即从命令行R脚本调用)时的默认设置。

如果没有提供cl(或传递了NULL),则使用默认的非并行lapply,也包括进度条。


你是否找到了一种干净的方法来在pblapply/mclapply函数周围包装tryCatch,以便在抛出异常时停止集群?当我尝试在RStudio中停止这些并行进程时,我最终会得到无法控制的核心,必须通过终端手动杀死它们。 - philiporlando
1
@spacedSparking 不是,但我一直在使用furrr包。 - Axeman

7

这是一个基于@fotNelton's solution的函数,可以在通常使用mcapply的任何地方应用。

mcadply <- function(X, FUN, ...) {
  # Runs multicore lapply with progress indicator and transformation to
  # data.table output. Arguments mirror those passed to lapply.
  #
  # Args:
  # X:   Vector.
  # FUN: Function to apply to each value of X. Note this is transformed to 
  #      a data.frame return if necessary.
  # ...: Other arguments passed to mclapply.
  #
  # Returns:
  #   data.table stack of each mclapply return value
  #
  # Progress bar code based on https://dev59.com/-Ggu5IYBdhLWcg3w6bKo#10993589
  require(multicore)
  require(plyr)
  require(data.table)

  local({
    f <- fifo(tempfile(), open="w+b", blocking=T)
    if (inherits(fork(), "masterProcess")) {
      # Child
      progress <- 0
      print.progress <- 0
      while (progress < 1 && !isIncomplete(f)) {
        msg <- readBin(f, "double")
        progress <- progress + as.numeric(msg)
        # Print every 1%
        if(progress >= print.progress + 0.01) {
          cat(sprintf("Progress: %.0f%%\n", progress * 100))
          print.progress <- floor(progress * 100) / 100
        }
      }
      exit()
    }

    newFun <- function(...) {
      writeBin(1 / length(X), f)
      return(as.data.frame(FUN(...)))
    }

    result <- as.data.table(rbind.fill(mclapply(X, newFun, ...)))
    close(f)
    cat("Done\n")
    return(result)
  })
}

6

你可以使用系统的echo函数从你的工作进程中写入内容,只需将以下行添加到你的函数中:

myfun <- function(x){
if(x %% 5 == 0) system(paste("echo 'now processing:",x,"'"))
dosomething(mydata[x])
}

result <- mclapply(1:10,myfun,mc.cores=5)
> now processing: 5 
> now processing: 10 

这将在您传递索引时起作用,例如,不要传递数据列表,而是传递索引并在工作函数中提取数据。

这对我来说是更顺畅的方法,也因为我可以为每个“循环”添加额外的细节。 - Garini

2

根据@fotNelson的回答,使用进度条代替逐行打印,并使用mclapply调用外部函数。

library('utils')
library('multicore')

prog.indic <- local({ #evaluates in local environment only
    f <- fifo(tempfile(), open="w+b", blocking=T) # open fifo connection
    assign(x='f',value=f,envir=.GlobalEnv)
    pb <- txtProgressBar(min=1, max=MC,style=3)

    if (inherits(fork(), "masterProcess")) { #progress tracker
        # Child
        progress <- 0.0
        while (progress < MC && !isIncomplete(f)){ 
            msg <- readBin(f, "double")
                progress <- progress + as.numeric(msg)

            # Updating the progress bar.
            setTxtProgressBar(pb,progress)
            } 


        exit()
        }
   MC <- 100
   result <- mclapply(1:MC, .mcfunc)

    cat('\n')
    assign(x='result',value=result,envir=.GlobalEnv)
    close(f)
    })

.mcfunc<-function(i,...){
        writeBin(1, f)
        return(i)
    }

将fifo连接分配给.GlobalEnv是必要的,以便从mclapply调用之外的函数中使用它。感谢您的问题和之前的回复,我已经想了一段时间如何做到这一点。

网页内容由stack overflow 提供, 点击上面的
可以查看英文原文,
原文链接