这是我的实现。它是一个函数
chunkmap
,接受一个向量化的函数、一个应该被向量化的参数列表以及一个不应该被向量化的参数列表(即常数),并返回与直接在参数上调用函数相同的结果,但是结果是并行计算的。对于函数
f
,向量参数
v1
、
v2
、
v3
和标量参数
s1
、
s2
,以下应该返回相同的结果:
f(a=v1, b=v2, c=v3, d=s1, e=s2)
f(c=v3, b=v2, e=s2, a=v1, d=s1)
chunkapply(FUN=f, VECTOR.ARGS=list(a=v1, b=v2, c=v3), SCALAR.ARGS=list(d=s1, e=s2))
chunkapply(FUN=f, SCALAR.ARGS=list(e=s2, d=s1), VECTOR.ARGS=list(a=v1, c=v3, b=v2))
由于chunkapply
函数无法知道f
的哪些参数是矢量化的,哪些不是,因此在调用时需要您自行指定,否则会得到错误的结果。通常应该命名参数以确保它们正确绑定。
library(foreach)
library(iterators)
library(doMC)
registerDoMC()
get.chunk.size <- function(vec.length,
min.chunk.size=NULL, max.chunk.size=NULL,
max.chunks=NULL) {
if (is.null(max.chunks)) {
max.chunks <- getDoParWorkers()
}
size <- vec.length / max.chunks
if (!is.null(max.chunk.size)) {
size <- min(size, max.chunk.size)
}
if (!is.null(min.chunk.size)) {
size <- max(size, min.chunk.size)
}
num.chunks <- ceiling(vec.length / size)
actual.size <- ceiling(vec.length / num.chunks)
return(actual.size)
}
ichunk.vectors <- function(vectors=NULL,
min.chunk.size=NULL,
max.chunk.size=NULL,
max.chunks=NULL) {
recycle.length <- max(sapply(vectors, length))
actual.chunk.size <- get.chunk.size(recycle.length, min.chunk.size, max.chunk.size, max.chunks)
num.chunks <- ceiling(recycle.length / actual.chunk.size)
i <- 1
it <- idiv(recycle.length, chunks=num.chunks)
nextEl <- function() {
n <- nextElem(it)
ix <- seq(i, length = n)
i <<- i + n
vchunks <- foreach(v=vectors) %do% v[1+ (ix-1) %% length(v)]
names(vchunks) <- names(vectors)
vchunks
}
obj <- list(nextElem = nextEl)
class(obj) <- c("ichunk", "abstractiter", "iter")
obj
}
chunkapply <- function(FUN, VECTOR.ARGS, SCALAR.ARGS=list(), MERGE=TRUE, ...) {
stopifnot(is.list(VECTOR.ARGS))
stopifnot(length(VECTOR.ARGS) >= 1)
stopifnot(is.list(SCALAR.ARGS))
if (MERGE) {
combine.fun <- append
} else {
combine.fun <- foreach:::defcombine
}
foreach(vchunk=ichunk.vectors(vectors=VECTOR.ARGS, ...),
.combine=combine.fun,
.options.multicore = mcoptions) %dopar%
{
do.call(FUN, args=append(vchunk, SCALAR.ARGS))
}
}
maybe.chunkapply <- function(FUN, VECTOR.ARGS, SCALAR.ARGS=list(), ...) {
if (getDoParWorkers() > 1) {
chunkapply(FUN, VECTOR.ARGS, SCALAR.ARGS, ...)
} else {
do.call(FUN, append(VECTOR.ARGS, SCALAR.ARGS))
}
}
以下是一些示例,展示了
chunkapply(f,list(x))
与
f(x)
产生相同的结果。我将max.chunk.size设置得非常小,以确保实际使用分块算法。
>
> identical(chunkapply(function(x,y) x*y, list(1:50), list(2), max.chunk.size=10), 1:50 * 2)
[1] TRUE
>
> a <- rnorm(n=100)
> cutoff <- 1
> identical(chunkapply(function(x,limit) x[x<=limit], list(x=a), list(limit=cutoff), max.chunk.size=10), a[a<cutoff])
[1] TRUE
如果有比“chunkapply”更好的名称,请提出建议。
编辑:
正如另一个答案所指出的,multicore包中有一个名为“pvec”的函数,其功能与我编写的函数非常相似。对于简单的情况,您应该使用它,并且应该投票支持Jonas Rauch的答案。但是,我的函数更加通用,因此如果以下任何情况适用于您,则您可能希望考虑改用我的函数:
- 您需要使用除multicore之外的并行后端(例如MPI)。我的函数使用foreach,因此您可以使用为foreach提供后端的任何并行化框架。
- 您需要传递多个矢量化参数。 “pvec”仅矢量化单个参数,因此您无法轻松地使用“pvec”实现并行矢量化加法,例如。我的函数允许您指定任意参数。
f
会透明地利用它进行并行化。所以现在我只需要弄清楚如何设置MPI。 - Ryan C. Thompsonf(X)
正是我想要做的,只是更快。扩展我的先前评论,如果正确设置了MPI,则“f”的实现会自动使用MPI。因此,我现在必须决定是在这台服务器上设置MPI更容易,还是自己进行分块更容易。 - Ryan C. Thompsonp
,主题s
和cores=2
,例如,tasks <- split(seq_along(p), cut(seq_along(p), cores))
和mclapply(tasks, function(i, p, ...) pairwiseAlignment(p[i], ...), p, s, scoreOnly=TRUE)
与pairwiseAlignment(p, s, scoreOnly=TRUE)
相比。执行时间为2倍(对于我的两个内核),结果相同。如果不仅仅是分数,合并对象更具挑战性。 - Martin Morgan