Python中的并行处理

38

有没有一个简单的代码可以在Python 2.7中进行并行处理?我在网上找到的所有示例都很复杂,包含了不必要的代码。

如果我想编写一个简单的暴力整数因子分解程序,如何才能让每个核心(4个)分解一个整数?我的真正程序可能只需要2个核心,并且需要共享信息。

我知道有parallel-python和其他库存在,但我想尽可能减少使用的库的数量,因此我想使用thread和/或multiprocessing库,因为它们与Python一起提供。


这里有另一种方法,我在这里解释了:最小活动线程进程 - Vikas Gautam
4个回答

32
一个好的在 Python 中开始并行处理的简单方法就是使用 multiprocessing 中的池映射 -- 它类似于通常的 Python 映射,但是单个函数调用会分布到不同数量的进程中。因子分解是一个很好的例子,你可以将所有可用任务分散在不同的任务上来进行暴力检查。
from multiprocessing import Pool
import numpy

numToFactor = 976

def isFactor(x):
    result = None
    div = (numToFactor / x)
    if div*x == numToFactor:
        result = (x,div)
    return result

if __name__ == '__main__':
    pool = Pool(processes=4)
    possibleFactors = range(1,int(numpy.floor(numpy.sqrt(numToFactor)))+1)
    print 'Checking ', possibleFactors
    result = pool.map(isFactor, possibleFactors)
    cleaned = [x for x in result if not x is None]
    print 'Factors are', cleaned

这给了我

Checking  [1, 2, 3, 4, 5, 6, 7, 8, 9, 10, 11, 12, 13, 14, 15, 16, 17, 18, 19, 20, 21, 22, 23, 24, 25, 26, 27, 28, 29, 30, 31]
Factors are [(1, 976), (2, 488), (4, 244), (8, 122), (16, 61)]

2
我应该补充一下,上面的代码是可以工作的,但可能不会表现出惊人的并行性能,因为你所调用的开销(并行映射+函数调用)都是为了计算少量的工作(一点整数运算)。我将把如何分摊开销到更多的除法上留给读者思考——例如,如何更改上面的代码,使“isFactor”一次调用适用于多个除法。 - Jonathan Dursi
示例代码出现错误 AttributeError: Can't get attribute 'isFactor' on <module '__main__' (built-in)> - StatguyUser
1
Python 3 注意事项:在 isFactor(x) 中,用整数除法(//)代替 numToFactor / x。 - user8866053

8

mincemeat 是我发现的最简单的map/reduce实现。而且,它非常轻便,只有一个文件,并且使用标准库完成所有操作。


有趣...我会研究一下。 - calccrypto
这并不是我真正想要的。 - calccrypto
1
@calccrypto 为什么不呢?了解为什么mincemeat不完美可能会帮助其他人找到更好的解决方案。 - Annika Backstrom
2
这更多是针对数据库、服务器和类似的东西(不仅限于一台计算机)。我只是想同时运行多个函数。 - calccrypto

1

我认为如果你想保持在标准库内,使用multiprocessing中的Pool可能是最好的选择。如果你有兴趣进行其他类型的并行处理,但不想学习任何新知识(即仍然使用与multiprocessing相同的接口),那么你可以尝试使用pathos,它提供了几种形式的并行映射,并且基本上具有与multiprocessing相同的接口。

Python 2.7.6 (default, Nov 12 2013, 13:26:39) 
[GCC 4.2.1 Compatible Apple Clang 4.1 ((tags/Apple/clang-421.11.66))] on darwin
Type "help", "copyright", "credits" or "license" for more information.
>>> import numpy
>>> numToFactor = 976
>>> def isFactor(x):
...   result = None
...   div = (numToFactor / x)
...   if div*x == numToFactor:
...     result = (x,div)
...   return result
... 
>>> from pathos.multiprocessing import ProcessingPool as MPool
>>> p = MPool(4)
>>> possible = range(1,int(numpy.floor(numpy.sqrt(numToFactor)))+1)
>>> # standard blocking map
>>> result = [x for x in p.map(isFactor, possible) if x is not None]
>>> print result
[(1, 976), (2, 488), (4, 244), (8, 122), (16, 61)]
>>>
>>> # asynchronous map (there's also iterative maps too)
>>> obj = p.amap(isFactor, possible)                  
>>> obj
<processing.pool.MapResult object at 0x108efc450>
>>> print [x for x in obj.get() if x is not None]
[(1, 976), (2, 488), (4, 244), (8, 122), (16, 61)]
>>>
>>> # there's also parallel-python maps (blocking, iterative, and async) 
>>> from pathos.pp import ParallelPythonPool as PPool
>>> q = PPool(4)
>>> result = [x for x in q.map(isFactor, possible) if x is not None]
>>> print result
[(1, 976), (2, 488), (4, 244), (8, 122), (16, 61)]

另外,pathos 有一个与其接口相同的姊妹包叫做 pyina,它运行在 mpi4py 上,并提供了运行在 MPI 中的并行映射,可以使用多个调度程序运行。
另一个优点是,pathos 自带比标准 Python 更好的序列化器,因此它比 multiprocessing 更能够序列化一系列函数和其他东西。而且你可以从解释器中完成所有操作。
>>> class Foo(object):
...   b = 1
...   def factory(self, a):
...     def _square(x):
...       return a*x**2 + self.b
...     return _square
... 
>>> f = Foo()
>>> f.b = 100
>>> g = f.factory(-1)
>>> p.map(g, range(10))
[100, 99, 96, 91, 84, 75, 64, 51, 36, 19]
>>> 

在这里获取代码: https://github.com/uqfoundation


0

使用Ray可以优雅地完成这个任务,它是一个系统,可以轻松地并行和分发您的Python代码。

要并行化您的示例,您需要使用@ray.remote装饰器定义映射函数,然后使用.remote调用它。这将确保每个远程函数实例都在不同的进程中执行。

import ray

ray.init()

# Define the function to compute the factors of a number as a remote function.
# This will make sure that a call to this function will run it in a different
# process.
@ray.remote
def compute_factors(x):
    factors = [] 
    for i in range(1, x + 1):
       if x % i == 0:
           factors.append(i)
    return factors    

# List of inputs.
inputs = [67, 24, 18, 312]

# Call a copy of compute_factors() on each element in inputs.
# Each copy will be executed in a separate process.
# Note that a remote function returns a future, i.e., an
# identifier of the result, rather that the result itself.
# This enables the calls to remote function to not be blocking,
# which enables us to call many remote function in parallel. 
result_ids = [compute_factors.remote(x) for x in inputs]

# Now get the results
results = ray.get(result_ids)

# Print the results.
for i in range(len(inputs)):
    print("The factors of", inputs[i], "are", results[i]) 

使用 Ray 模块相比于 multiprocessing 模块有许多优势。特别是,相同的代码可以在单台机器上以及机器集群上运行。更多关于 Ray 的优势请参见this related post


网页内容由stack overflow 提供, 点击上面的
可以查看英文原文,
原文链接