多进程:执行顺序

4

我正在尝试以下代码:

from multiprocessing import Pool

def f(x):    
    return x

if __name__ == '__main__':
    p = Pool(5)
    print(p.map(f, [0, 1, 2, 3, 4, 5, 6, 7, 8, 9, 11, 12, 13]))

据我理解,这5个处理器会分别获得编号为0, 1, 2, 3, 4的任务进行处理。如果处理器1完成了它的任务,那么它会立即获得5这个任务,而其他处理器仍在处理1,2,3,4这些任务。如果出现后一种情况,那么如何实现上述代码以便于当一个处理器处于空闲状态时能够立即被分配新的任务?
如何测试该实现呢?
2个回答

2
线程池立即生成一个新线程(添加到您的示例中)。请注意,线程4花费了足够长的时间,以便第12个任务能够开始。
PS:我刚刚注意到您忘记了10。
from multiprocessing import Pool
import time
import random

def f(x):
    print "Enter %s" % x
    time.sleep( random.randrange(1,100,1)/10.0 )
    print "Exit %s" % x
    return x

if __name__ == '__main__':
    p = Pool(5)
    print(p.map(f, [0, 1, 2, 3, 4, 5, 6, 7, 8, 9, 11, 12, 13]))

Enter 0
Enter 1
Enter 2
Enter 3
Enter 4
Exit 0
Enter 5
Exit 3
Enter 6
Exit 2
Enter 7
Exit 5
Enter 8
Exit 1
Enter 9
Exit 6
Enter 11
Exit 11
Enter 12
Exit 4
Enter 13
Exit 7
Exit 12
Exit 9
Exit 8
Exit 13
[0, 1, 2, 3, 4, 5, 6, 7, 8, 9, 11, 12, 13]

1

是的,这种情况是可能的。 首先将输入分成单独的任务。当任务的大小(以处理时间为准)不相等且数量太少无法填补空缺时,问题就出现了。

来自文档

map(func, iterable[, chunksize])

此方法将可迭代对象切割成多个块,并将其作为单独的任务提交给进程池。可以通过将chunksize设置为正整数来指定这些块的(近似)大小。

示例

为了说明这种行为,我更改了f(x),使其需要x秒钟才能完成。

from multiprocessing import Pool
import time
import threading

def f(x):
    print('x: ' + str(x) + '\tThread ID: ' + str(threading.get_ident()))
    time.sleep(x)

if __name__ == '__main__':
    chunksize = 3
    with Pool(2) as p:
        p.map(f, [10, 1, 1, 1, 1, 1], chunksize)

输入数组[10, 1, 1, 1, 1, 1]被分成了len(arr) / chunksize = 2组:
[10, 1, 1]  # For thread 1, takes 12 seconds to finish
[ 1, 1, 1]  # For thread 2, takes 3 seconds to finish

因此,线程2将在3秒后完成,而线程1将继续工作9秒钟。

示例输出:

x: 10   Thread ID: 8556
x: 1    Thread ID: 59180
x: 1    Thread ID: 59180
x: 1    Thread ID: 59180
x: 1    Thread ID: 8556
x: 1    Thread ID: 8556

如果您发现自己处于这种情况,那么您可以强制使用较小的“chunksize”。一个值为1确保了尽可能平衡的工作负载,但代价是更高的开销。

我觉得你对这个线程池有一些重要的东西要分享,但是,所有的"1"的例子让人很难追踪。 - Jon Malachowski
我只是用1来简化这个例子。:/ 在这个例子中,工作将被分成两部分,线程1将花费12秒钟,而线程2将花费3秒钟。 - msitt

网页内容由stack overflow 提供, 点击上面的
可以查看英文原文,
原文链接