joblib的多进程无法并行化?

3
自从我从python3.5升级到3.6后,使用joblib进行并行计算时未能减少计算时间。 以下是安装的库版本: - python: 3.6.3 - joblib: 0.11 - numpy: 1.14.0
基于一个非常著名的例子,以下是一个可重现该问题的示例代码:
import time
import numpy as np
from joblib import Parallel, delayed

def square_int(i):
    return i * i

ndata = 1000000 
ti = time.time()
results = []    
for i in range(ndata):
    results.append(square_int(i))

duration = np.round(time.time() - ti,4)
print(f"standard computation: {duration} s" )

for njobs in [1,2,3,4] :
    ti = time.time()  
    results = []
    results = Parallel(n_jobs=njobs, backend="multiprocessing")\
        (delayed(square_int)(i) for i in range(ndata))
    duration = np.round(time.time() - ti,4)
    print(f"{njobs} jobs computation: {duration} s" )

我得到了以下输出:
  • 标准计算:0.2672秒
  • 1个作业的计算:352.3113秒
  • 2个作业的计算:6.9662秒
  • 3个作业的计算:7.2556秒
  • 4个作业的计算:7.097秒

当我将ndata的数量增加10倍并删除1个核心计算时,我得到了以下结果:

  • 标准计算:2.4739秒
  • 2个作业的计算:77.8861秒
  • 3个作业的计算:79.9909秒
  • 4个作业的计算:83.1523秒

有没有人对我应该探究哪个方向有想法?

1个回答

9
我认为主要原因是你并行的开销超过了利益。换句话说,你的square_int太简单了,无法通过并行获得任何性能提升。这个square_int太简单了,进程间传递输入和输出可能比执行square_int函数本身还需要更长的时间。
我通过创建一个square_int_batch函数来修改你的代码。虽然计算时间仍然比串行实现长,但已经大大减少了。
import time
import numpy as np
from joblib import Parallel, delayed

def square_int(i):
    return i * i

def square_int_batch(a,b):
    results=[]
    for i in range(a,b):
        results.append(square_int(i))
    return results

ndata = 1000000 
ti = time.time()
results = []    
for i in range(ndata):
    results.append(square_int(i))

# results = [square_int(i) for i in range(ndata)]

duration = np.round(time.time() - ti,4)
print(f"standard computation: {duration} s" )

batch_num = 3
batch_size=int(ndata/batch_num)

for njobs in [2,3,4] :
    ti = time.time()  
    results = []
    a = list(range(ndata))
#     results = Parallel(n_jobs=njobs, )(delayed(square_int)(i) for i in range(ndata))
#     results = Parallel(n_jobs=njobs, backend="multiprocessing")(delayed(
    results = Parallel(n_jobs=njobs)(delayed(
        square_int_batch)(i*batch_size,(i+1)*batch_size) for i in range(batch_num))
    duration = np.round(time.time() - ti,4)
    print(f"{njobs} jobs computation: {duration} s" )

计算时间如下:

standard computation: 0.3184 s
2 jobs computation: 0.5079 s
3 jobs computation: 0.6466 s
4 jobs computation: 0.4836 s

以下是几个建议,可以帮助缩短时间。

  1. 在你的特定情况下,使用列表推导式results = [square_int(i) for i in range(ndata)]替换for循环,它更快。我测试过了。
  2. batch_num设置为合理的大小。这个值越大,开销就越大。在我的情况下,当batch_num超过1000时,速度开始显著变慢。
  3. 我使用了默认后端loky而不是multiprocessing。至少在我的情况下,它略微更快。

从其他一些SO问题中,我读到多进程对于cpu密集型任务效果很好,但我没有官方定义。您可以自行探索。


1
嗨,我也认为函数square_int太简单了。非常感谢您上次的建议,我会尝试一下。 - phypho

网页内容由stack overflow 提供, 点击上面的
可以查看英文原文,
原文链接