在Julia中编写一个函数,使用addprocs()和pmap()函数。

5
在Julia中,我想在一个定义在模块内的函数中使用addprocspmap。以下是一个愚蠢的例子:
module test

using Distributions

export g, f

function g(a, b)
  a + rand(Normal(0, b))
end

function f(A, b)

  close = false
  if length(procs()) == 1    #  If there are already extra workers,
    addprocs()               #  use them, otherwise, create your own.
    close = true
  end

  W  = pmap(x -> g(x, b), A)

  if close == true
    rmprocs(workers())       #  Remove the workers you created.
  end

  return W

end

end

test.f(randn(5), 1)

这会返回一个非常长的错误信息。
WARNING: Module test not defined on process 4
WARNING: Module test not defined on process 3
fatal error on fatal error on WARNING: Module test not defined on process 2
43: : WARNING: Module test not defined on process 5
fatal error on fatal error on 5: 2: ERROR: UndefVarError: test not defined
 in deserialize at serialize.jl:504
 in handle_deserialize at serialize.jl:477
 in deserialize at serialize.jl:696

...

 in message_handler_loop at multi.jl:878
 in process_tcp_streams at multi.jl:867
 in anonymous at task.jl:63
Worker 3 terminated.
Worker 2 terminated.ERROR (unhandled task failure): EOFError: read end of file
WARNING: rmprocs: process 1 not removed

Worker 5 terminated.ERROR (unhandled task failure): EOFError: read end of file

4-element Array{Any,1}:Worker 4 terminated.ERROR (unhandled task failure): EOFError: read end of file


 ERROR (unhandled task failure): EOFError: read end of file
ProcessExitedException()
 ProcessExitedException()
 ProcessExitedException()
 ProcessExitedException()

我想做的是编写一个包,其中包含一些函数,可以选择性地并行执行操作。因此,像f这样的函数可能会接受一个参数par::Bool,如果用户使用par = true调用f,则会执行类似我上面展示的操作,否则将循环执行。因此,在f的定义内部(以及在模块test的定义内部),我希望创建工作进程并向它们广播Distributions包和函数g
1个回答

1

在你的函数中使用@everywhere有什么问题?例如,以下代码在我的电脑上可以正常工作。

function f(A, b)

  close = false
  if length(procs()) == 1    #  If there are already extra workers,
    addprocs()               #  use them, otherwise, create your own.
    @everywhere begin
      using Distributions
      function g(a, b)
        a + rand(Normal(0, b))
      end
    end
    close = true
  end

  W  = pmap(x -> g(x, b), A)

  if close == true
    rmprocs(workers())       #  Remove the workers you created.
  end

  return W

end

f(randn(5), 1)

注意:当我第一次运行此代码时,我需要重新编译Distributions包,因为它自上次使用以来已更新。在重新编译后立即尝试上述脚本时,它失败了。但是,我随后退出了Julia并重新打开它,然后它就正常工作了。也许这就是导致您的错误的原因?

我认为问题在于f是在一个模块中定义的。您发布的代码在REPL中执行是可以工作的。但是在模块内部定义f,然后调用f是不起作用的。我会更新问题。 - jcz

网页内容由stack overflow 提供, 点击上面的
可以查看英文原文,
原文链接