在R语言中,最简单的向量化函数并行化方法是什么?

17

我有一个非常大的列表X和一个向量化函数f。我想计算f(X),但如果我使用单个核心进行计算,这将需要很长时间。我可以(访问)一个48核服务器。最简单的方法是如何并行计算f(X)?以下不是正确答案:

library(foreach)
library(doMC)
registerDoMC()

foreach(x=X, .combine=c) %dopar% f(x)

上述代码确实会并行计算f(X),但它会通过将f分别应用于X的每个元素来实现。这忽略了f的向量化特性,结果可能会使事情变得更慢而不是更快。我想将X分成合理大小的块,并对其应用f,而不是逐个元素地将f应用于X
那么,我应该只是手动将X分成48个相等大小的子列表,然后并行地对每个子列表应用f,然后手动组合结果吗?还是有专门设计的软件包?
如果有人感到困惑,我的具体用例在此处

实际上,我刚刚注意到如果发现MPI,f会透明地利用它进行并行化。所以现在我只需要弄清楚如何设置MPI。 - Ryan C. Thompson
1
这是一个非常好的问题。但我缺少一件事情(至少...可能不止一件)。如果X是一个列表,你真的能在它上面运行向量化函数f()吗?似乎X需要是一个向量或矩阵才能从f()内部的向量化中受益。 - JD Long
你应该在单台机器上使用多核而不是MPI;MPI会在同一台机器上启动48个R实例,并将数据复制48次。痛苦啊。将X分成大约相等大小的48个块,对每个块执行f(chunk),然后想办法将它们拼接在一起;你可以使用doMC,在其中foreach循环遍历块而不是原始元素。 - Martin Morgan
实际上,该函数接受一个“DNAStringSet”类的实例,它是DNA序列列表的容器,而“X”是该类的一个实例。但重点是它是矢量化的,即f(X)正是我想要做的,只是更快。扩展我的先前评论,如果正确设置了MPI,则“f”的实现会自动使用MPI。因此,我现在必须决定是在这台服务器上设置MPI更容易,还是自己进行分块更容易。 - Ryan C. Thompson
听起来像是pairwiseAlignment;对于模式p,主题scores=2,例如,tasks <- split(seq_along(p), cut(seq_along(p), cores))mclapply(tasks, function(i, p, ...) pairwiseAlignment(p[i], ...), p, s, scoreOnly=TRUE)pairwiseAlignment(p, s, scoreOnly=TRUE)相比。执行时间为2倍(对于我的两个内核),结果相同。如果不仅仅是分数,合并对象更具挑战性。 - Martin Morgan
5个回答

6

虽然这是一个旧问题,但对于那些通过谷歌(像我一样)偶然发现此问题的人来说,这可能仍然很有趣:看看multicore包中的pvec函数。我认为它正好符合您的需求。


在 Windows 上运行 R 的用户是否有类似的功能? - Zach
Zach:我认为你应该看一下Revolution R,它是专门针对Windows平台并注重并行处理的R版本。使用doSMP包作为后端,采用%dopar%方法可能是最好的方式。Ryan Thompson的答案应该可以很好地使用它。 - Jonas Rauch

5
这是我的实现。它是一个函数chunkmap,接受一个向量化的函数、一个应该被向量化的参数列表以及一个不应该被向量化的参数列表(即常数),并返回与直接在参数上调用函数相同的结果,但是结果是并行计算的。对于函数f,向量参数v1v2v3和标量参数s1s2,以下应该返回相同的结果:
f(a=v1, b=v2, c=v3, d=s1, e=s2)
f(c=v3, b=v2, e=s2, a=v1, d=s1)
chunkapply(FUN=f, VECTOR.ARGS=list(a=v1, b=v2, c=v3), SCALAR.ARGS=list(d=s1, e=s2))
chunkapply(FUN=f, SCALAR.ARGS=list(e=s2, d=s1), VECTOR.ARGS=list(a=v1, c=v3, b=v2))

由于chunkapply函数无法知道f的哪些参数是矢量化的,哪些不是,因此在调用时需要您自行指定,否则会得到错误的结果。通常应该命名参数以确保它们正确绑定。

library(foreach)
library(iterators)
# Use your favorite doPar backend here
library(doMC)
registerDoMC()

get.chunk.size <- function(vec.length,
                           min.chunk.size=NULL, max.chunk.size=NULL,
                           max.chunks=NULL) {
  if (is.null(max.chunks)) {
    max.chunks <- getDoParWorkers()
  }
  size <- vec.length / max.chunks
  if (!is.null(max.chunk.size)) {
    size <- min(size, max.chunk.size)
  }
  if (!is.null(min.chunk.size)) {
    size <- max(size, min.chunk.size)
  }
  num.chunks <- ceiling(vec.length / size)
  actual.size <- ceiling(vec.length / num.chunks)
  return(actual.size)
}

ichunk.vectors <- function(vectors=NULL,
                           min.chunk.size=NULL,
                           max.chunk.size=NULL,
                           max.chunks=NULL) {
  ## Calculate number of chunks
  recycle.length <- max(sapply(vectors, length))
  actual.chunk.size <- get.chunk.size(recycle.length, min.chunk.size, max.chunk.size, max.chunks)
  num.chunks <- ceiling(recycle.length / actual.chunk.size)

  ## Make the chunk iterator
  i <- 1
  it <- idiv(recycle.length, chunks=num.chunks)
  nextEl <- function() {
    n <- nextElem(it)
    ix <- seq(i, length = n)
    i <<- i + n
    vchunks <- foreach(v=vectors) %do% v[1+ (ix-1) %% length(v)]
    names(vchunks) <- names(vectors)
    vchunks
  }
  obj <- list(nextElem = nextEl)
  class(obj) <- c("ichunk", "abstractiter", "iter")
  obj
}

chunkapply <- function(FUN, VECTOR.ARGS, SCALAR.ARGS=list(), MERGE=TRUE, ...) {
  ## Check that the arguments make sense
  stopifnot(is.list(VECTOR.ARGS))
  stopifnot(length(VECTOR.ARGS) >= 1)
  stopifnot(is.list(SCALAR.ARGS))
  ## Choose appropriate combine function
  if (MERGE) {
    combine.fun <- append
  } else {
    combine.fun <- foreach:::defcombine
  }
  ## Chunk and apply, and maybe merge
  foreach(vchunk=ichunk.vectors(vectors=VECTOR.ARGS, ...),
          .combine=combine.fun,
          .options.multicore = mcoptions) %dopar%
  {
    do.call(FUN, args=append(vchunk, SCALAR.ARGS))
  }
}

## Only do chunkapply if it will run in parallel
maybe.chunkapply <- function(FUN, VECTOR.ARGS, SCALAR.ARGS=list(), ...) {
  if (getDoParWorkers() > 1) {
    chunkapply(FUN, VECTOR.ARGS, SCALAR.ARGS, ...)
  } else {
    do.call(FUN, append(VECTOR.ARGS, SCALAR.ARGS))
  }
}

以下是一些示例,展示了chunkapply(f,list(x))f(x)产生相同的结果。我将max.chunk.size设置得非常小,以确保实际使用分块算法。
> # Generate all even integers from 2 to 100 inclusive
> identical(chunkapply(function(x,y) x*y, list(1:50), list(2), max.chunk.size=10), 1:50 * 2)
[1] TRUE

> ## Sample from a standard normal distribution, then discard values greater than 1
> a <- rnorm(n=100)
> cutoff <- 1
> identical(chunkapply(function(x,limit) x[x<=limit], list(x=a), list(limit=cutoff), max.chunk.size=10), a[a<cutoff])
[1] TRUE

如果有比“chunkapply”更好的名称,请提出建议。
编辑:
正如另一个答案所指出的,multicore包中有一个名为“pvec”的函数,其功能与我编写的函数非常相似。对于简单的情况,您应该使用它,并且应该投票支持Jonas Rauch的答案。但是,我的函数更加通用,因此如果以下任何情况适用于您,则您可能希望考虑改用我的函数:
- 您需要使用除multicore之外的并行后端(例如MPI)。我的函数使用foreach,因此您可以使用为foreach提供后端的任何并行化框架。 - 您需要传递多个矢量化参数。 “pvec”仅矢量化单个参数,因此您无法轻松地使用“pvec”实现并行矢量化加法,例如。我的函数允许您指定任意参数。

1
你写的函数非常有用。你应该将它打包成一个R包! - Zach

3

itertools包旨在解决这种问题。在这种情况下,我会使用isplitVector

n <- getDoParWorkers()
foreach(x=isplitVector(X, chunks=n), .combine='c') %dopar% f(x)

在这个例子中,pvec 显然更快更简单,但是可以使用 doParallel 包在 Windows 上实现相同的效果。


好的答案。我将把它标记为被接受的答案,因为它是最便携的(pvec不支持Windows)。此外,可以按照以下方式实现基于foreach的pvec:fpvec <- function(v, FUN) foreach(x=isplitVector(v, chunks=getDoParWorkers()), .combine='c') %dopar% f(x) - Ryan C. Thompson

0

这样怎么样?R将利用所有可用的内存,multicore将并行处理所有可用的核心。

library(multicore)
result = mclapply(X, function,mc.preschedule=FALSE, mc.set.seed=FALSE)

与foreach示例一样,这种方法不会利用f的向量化。 - Ryan C. Thompson

0

据我所知,这个问题与我在问题中举的例子相同。它只是将标量函数向量化并进行并行化处理。它不会并行化已经向量化的函数。 - Ryan C. Thompson

网页内容由stack overflow 提供, 点击上面的
可以查看英文原文,
原文链接