Python分布式集群多进程处理

38

我正在寻找一个能够在单台计算机的不同核心之间以及分布在多台计算机上进行多进程操作的Python包。有许多不同的Python分布式计算包,但大多数似乎需要更改代码才能运行(例如,需要添加表明对象位于远程计算机上的前缀)。具体而言,我希望找到尽可能接近多进程 pool.map 函数的东西。例如,在单个计算机上脚本如下:

from multiprocessing import Pool
pool = Pool(processes = 8)
resultlist = pool.map(function, arglist)

那么分布式集群的伪代码如下:

from distprocess import Connect, Pool, Cluster

pool1 = Pool(processes = 8)
c = Connect(ipaddress)
pool2 = c.Pool(processes = 4)
cluster = Cluster([pool1, pool2])
resultlist = cluster.map(function, arglist)

可能需要比你想象的更多的设置,但你可以看一下Celery来实现分布式任务队列。http://docs.celeryproject.org/en/latest/getting-started/first-steps-with-celery.html - monkut
我会看一下 jug - szxk
我可能最终会使用celery,但它需要大量设置,而且帮助文件很难理解(不是清晰的逐步说明,而是在结尾处附带整个脚本的不连贯说明)。Jug的文档讨论了并行化,但没有涉及跨不同计算机的并行化。 - Michael
这里还有一个相当全面的解决方案列表:https://wiki.python.org/moin/ParallelProcessing - Yibo Yang
值得注意的是,还有一个类似于pathos的更近期的解决方案——名为dask的软件包。 - Mike McKerns
5个回答

22

如果您想要非常简单的解决方案,那是不存在的。

然而,有一个具有 multiprocessing 接口的解决方案 -- pathos -- 它能够通过并行映射建立到远程服务器的连接,并进行多进程处理。

如果您想要使用 ssh-tunneled 连接,您可以这样做... 或者如果您愿意采用不太安全的方法,您也可以这样做。

>>> # establish a ssh tunnel
>>> from pathos.core import connect
>>> tunnel = connect('remote.computer.com', port=1234)
>>> tunnel       
Tunnel('-q -N -L55774:remote.computer.com:1234 remote.computer.com')
>>> tunnel._lport
55774
>>> tunnel._rport
1234
>>> 
>>> # define some function to run in parallel
>>> def sleepy_squared(x):
...   from time import sleep
...   sleep(1.0)
...   return x**2
... 
>>> # build a pool of servers and execute the parallel map
>>> from pathos.pp import ParallelPythonPool as Pool
>>> p = Pool(8, servers=('localhost:55774',))
>>> p.servers
('localhost:55774',)
>>> y = p.map(sleepy_squared, x)
>>> y
[0, 1, 4, 9, 16, 25, 36, 49, 64, 81]

或者,您可以配置直接连接(无需ssh)

>>> p = Pool(8, servers=('remote.computer.com:5678',))
# use an asynchronous parallel map
>>> res = p.amap(sleepy_squared, x)
>>> res.get()
[0, 1, 4, 9, 16, 25, 36, 49, 64, 81]

远程服务器的工作有点棘手,您必须要先在指定端口上启动在remote.computer.com上运行的服务器 -- 同时必须确保本地主机和远程主机的设置都允许直接连接或ssh隧道连接。此外,您需要在每个主机上运行相同版本的pathospppathos分支。同时,对于ssh,您需要运行ssh-agent以允许使用ssh进行无密码登录。

但是,希望一切顺利...如果您的函数代码可以通过dill.source.importable传输到远程主机。

顺便说一下,pathos早就应该发布一个新版本了,基本上需要解决一些错误和接口更改才能发布新的稳定版本。


5
我应该提一下我是“感伤”这篇作品的作者。 - Mike McKerns
2
我强烈建议您创建一个详细的设置指南,从开始到结束,以便在服务器上运行server.py文件并在客户端上运行client.py文件后,客户端实际上可以访问服务器并在客户端和服务器之间汇集作业。阅读了这个答案和您对我的另一个问题的回答后,我仍然不确定如何(a)设置服务器或(b)建立安全连接到服务器。 - Michael
我不知道如何做(a)。设置服务器是否只是指有一个服务器,如果SSH身份验证正确,则可以运行Python?我认为您假设(b)由openSSH在Python之外处理?在您提供的示例中,您似乎建立了连接,但是tunnel对象再也没有被使用,并且remote.computer.com未包含在您创建的下一个池中。它在“而是您可以配置直接连接(无需ssh)”中被引用,但我真的不明白它是如何工作的,因为没有SSH,我怎么能够对服务器进行身份验证? - Michael
1
当您创建一个隧道时,该隧道将本地端口与远程端口连接起来。因此,您的计算机只需将所有请求发送到本地端口,隧道将通过SSH将其传送到远程服务器上。您只需要使用SSH来设置隧道,因此只需要调用它一次。从那时开始,您可以通过与自己的本地端口通信,在安全隧道中传输不安全的通信。如果您没有使用隧道,则必须告诉池连接到远程服务器。请查看一些关于SSH隧道工作原理的文档。Pathos会为您设置一个隧道。 - Mike McKerns
如果您正在使用pp与远程服务器通信,则需要在远程主机上运行ppserver。 如果您使用其他东西(zmq,...),则需要运行该类型的服务器。 Pathos确实有一些代码可以在远程主机上启动服务器,但它并不完全健壮,因为您需要保存jobid引用以关闭它,否则您需要登录并找出哪个正在运行的作业是您的服务器。 您也可以使用pathos远程执行此操作,但如果您不熟悉杀死unix进程,则不是您想要涉足的领域。 - Mike McKerns
显示剩余9条评论

21

我建议看看Ray,这正是它的目标。

Ray在单机多核设置中使用与在分布式设置中相同的语法来并行化代码。 如果您愿意使用for循环而不是map调用,则示例将如下所示。

import ray
import time

ray.init()

@ray.remote
def function(x):
    time.sleep(0.1)
    return x

arglist = [1, 2, 3, 4]

result_ids = [function.remote(x) for x in arglist]
resultlist = ray.get(result_ids)

这将使用本地所有可用核心并行运行四个任务。要在集群上运行相同的示例,唯一需要更改的是对 ray.init() 的调用。相关文档可以在这里找到。

请注意,我正在帮助开发Ray


12

我来晚了,但因为我也在寻找类似的解决方案,而且这个问题仍未得到回答,所以我想分享我的发现。

最终我使用了SCOOP。它提供了一个并行映射实现,可以跨多个核心和主机工作。在调用期间,如果需要,它还可以退回到Python的串行map函数。

从SCOOP的介绍页面上,引用如下功能:

SCOOP的功能和优势相比于futures、multiprocessing和类似模块如下:

  • 充分利用网络上多台计算机的能力;
  • 能够在任务内部生成多个任务;
  • API与PEP-3148兼容;
  • 通过仅进行轻微修改即可并行化串行代码;
  • 有效的负载平衡。

它确实有一些怪癖(函数/类必须是可pickle的),并且在多个主机上顺利运行的设置可能会很繁琐,特别是如果它们不共享相同的文件系统结构,但总体而言,我对结果感到非常满意。对于我们进行大量Numpy和Cython操作的目的来说,它提供了优秀的性能。

希望这可以帮助你。


1
“SCOOP” 比 “pathos” 功能稍弱但支持更好...但它仍然是一个不错的选择。据我所知,“pathos” 和 “SCOOP” 是提供分层并行/分布式映射的仅有的两个这样的软件包。 - Mike McKerns
1
@MikeMcKerns,我也看过Apache Spark。你能解释一下它与pathos(或SCOOP)有什么不同吗? - Michael
我们还研究了Spark、Celery、Jug和其他一些工具。考虑到我们的小型研究团队和有限的硬件资源,我们寻找的解决方案不需要采用不同的编程范式,也不需要进行大量重构,更不需要在所有工作主机上管理其他服务器进程。我们只是简单地寻求一种最具成本效益(以时间为单位)的方法来并行化已经存在的复杂代码。 - bazel
Apache Spark最初在伯克利,现在是Apache的一部分。它是更大生态系统的一部分,非常稳定,并且比pathosSCOOP得到更广泛的支持。它提供了许多不同后端的并行性,但我认为它不像 pathosSCOOP那样提供层次并行。 - Mike McKerns
2
@MikeMcKerns:我们在SCOOP上并行化了我们的代码,只用了两行代码。一行是导入语句(from scoop import futures),另一行是用SCOOP的map替换Python内置的串行map(futures.map(func,arraydata))。再也没有比这更简单的了。 - bazel
显示剩余6条评论

0

虽然有点晚,但希望对其他人有所帮助。

现在,我们可以使用mpi4py.futures(自版本3.0.0引入)和MPIPoolExecutor。请参阅文档https://mpi4py.readthedocs.io/en/stable/mpi4py.futures.html

然后,您的代码看起来与使用multiprocessing时相似。 您的Python脚本process.py将如下所示:

from mpi4py.futures import MPIPoolExecutor

def myfunc(a):
    # do something heavy here
    return a

if __name__ == '__main__':
    # init arglist
    # arglist = [ <...> ]
    with MPIPoolExecutor() as pool:
        resultlist = pool.map(myfunc, arglist)

然后通过例如批处理脚本调用

mpiexec -n <#processes, e.g. 12> python -m mpi4py.futures process.py

-1

你看过disco吗?

特点:

  • Map / Reduce范例
  • Python编程
  • 分布式共享磁盘
  • ssh底层传输
  • Web和控制台界面
  • 易于添加/阻止/删除节点
  • master启动slaves节点无需用户干预
  • slaves节点在故障的情况下会自动重新启动
  • 良好的文档。按照安装指南,我能够在几分钟内启动一个2台机器的集群(唯一需要做的是创建$DISCO_HOME/root文件夹以连接到WebUI,我猜是由于日志文件错误创建)。

来自disco文档的一个简单示例:

from disco.core import Job, result_iterator

def map(line, params):
    for word in line.split():
        yield word, 1

def reduce(iter, params):
    from disco.util import kvgroup
    for word, counts in kvgroup(sorted(iter)):
        yield word, sum(counts)

if __name__ == '__main__':
    job = Job().run(input=["http://discoproject.org/media/text/chekhov.txt"],
                    map=map,
                    reduce=reduce)
    for word, count in result_iterator(job.wait(show=True)):
        print(word, count)

网页内容由stack overflow 提供, 点击上面的
可以查看英文原文,
原文链接