concurrent.futures问题:为什么只有1个工作线程?

3
我正在尝试使用concurrent.futures.ProcessPoolExecutor来并行化串行任务。该串行任务涉及从数字范围中查找给定数字的出现次数。我的代码如下。
在执行期间,我注意到从任务管理器/系统监视器/top中只有一个cpu/线程不断运行,尽管对processPoolExecutor的max_workers参数赋了大于1的值。为什么会出现这种情况?我如何使用concurrent.futures将我的代码并行化?我的代码是在python 3.5上执行的。
import concurrent.futures as cf
from time import time

def _findmatch(nmax, number):    
    print('def _findmatch(nmax, number):')
    start = time()
    match=[]
    nlist = range(nmax)
    for n in nlist:
        if number in str(n):match.append(n)
    end = time() - start
    print("found {} in {}sec".format(len(match),end))
    return match

def _concurrent(nmax, number, workers):
    with cf.ProcessPoolExecutor(max_workers=workers) as executor:
        start = time()
        future = executor.submit(_findmatch, nmax, number)
        futures = future.result()
        found = len(futures)
        end = time() - start
        print('with statement of def _concurrent(nmax, number):')
        print("found {} in {}sec".format(found, end))
    return futures

if __name__ == '__main__':
    match=[]
    nmax = int(1E8)
    number = str(5) # Find this number
    workers = 3
    start = time()
    a = _concurrent(nmax, number, workers)
    end = time() - start
    print('main')
    print("found {} in {}sec".format(len(a),end))
2个回答

1
运行您的代码显示出所有三个工人都在那里,但其中两个正在休眠。问题在于,executor.submit(_findmatch, nmax, number) 只告诉一个工人执行函数_findmatch
我不理解您的代码在做什么,但基本上您需要:
  • 将任务分成三个等份,并使用executor.submit将每个部分发送到一个进程。
  • 将任务分成更小的块(比如一个由100个元素组成的块),并使用map,这样每个_findmatch只获取它所分配的块。

1
你的代码问题在于它只提交了一个任务,而其他工人则无所事事。你需要提交多个任务,以便工人可以并行执行。
下面的示例将搜索区域分成三个不同的任务,每个任务由不同的工人执行。submit 返回的 Futures 添加到列表中,一旦所有任务都被提交,就使用 wait 等待它们全部完成。如果在提交任务后立即调用 result,它将阻塞直到 Future 完成。
请注意,以下代码不是生成数字列表,而是计算其中包含数字 5 的数字数量,以减少内存使用:
import concurrent.futures as cf
from time import time

def _findmatch(nmin, nmax, number):
    print('def _findmatch', nmin, nmax, number)
    start = time()
    count = 0
    for n in range(nmin, nmax):
        if number in str(n):
            count += 1
    end = time() - start
    print("found {} in {}sec".format(count,end))
    return count

def _concurrent(nmax, number, workers):
    with cf.ProcessPoolExecutor(max_workers=workers) as executor:
        start = time()
        chunk = nmax // workers
        futures = []

        for i in range(workers):
            cstart = chunk * i
            cstop = chunk * (i + 1) if i != workers - 1 else nmax

            futures.append(executor.submit(_findmatch, cstart, cstop, number))

        cf.wait(futures)
        res = sum(f.result() for f in futures)
        end = time() - start
        print('with statement of def _concurrent(nmax, number):')
        print("found {} in {}sec".format(res, end))
    return res

if __name__ == '__main__':
    match=[]
    nmax = int(1E8)
    number = str(5) # Find this number
    workers = 3
    start = time()
    a = _concurrent(nmax, number, workers)
    end = time() - start
    print('main')
    print("found {} in {}sec".format(a,end))

输出:

def _findmatch 0 33333333 5
def _findmatch 33333333 66666666 5
def _findmatch 66666666 100000000 5
found 17190813 in 20.09431290626526sec
found 17190813 in 20.443560361862183sec
found 22571653 in 20.47660517692566sec
with statement of def _concurrent(nmax, number):
found 56953279 in 20.6196870803833sec
main
found 56953279 in 20.648695707321167sec

谢谢。在我消化你的建议的同时,我有一个问题。为什么需要手动创建块?concurrent.futures.ProcessPoolExecutor不应该自动将解决给定函数的工作分配给其工作池中的工作人员吗? - Sun Bear
@SunBear:作为程序员,你的工作是将工作分成可以由工人独立运行的块。ProcessPoolExecutor负责调用给定的块由工人运行。请注意,在示例中,我可以将任务分成10个不同的任务,最终结果将是相同的(当然,控制台输出将不同,因为_findmatch将运行10次)。 - niemmi
谢谢你的指点。我已经重写了代码,输出一个包含出现数字的列表。我会在我的下一个问题中发布它,并与executor.map()的性能进行比较。 - Sun Bear
我已经对.submit().map()进行了基准测试,与串行代码进行了比较 https://dev59.com/eOk5XIcBkEYKwwoY_O-N。如果您有时间,请发表评论。 - Sun Bear

网页内容由stack overflow 提供, 点击上面的
可以查看英文原文,
原文链接